-#!/usr/bin/python
+#!/usr/bin/env /usr/share/plc_api/plcsh
#
# Test script example
#
random = Random()
config = Config()
-auth = config.auth
-try: boot_states = config.api.GetBootStates(auth)
+try: boot_states = GetBootStates()
except: boot_states = [u'boot', u'dbg', u'inst', u'new', u'rcnf', u'rins']
-try: roles = [role['name'] for role in config.api.GetRoles(auth)]
+try: roles = [role['name'] for role in GetRoles()]
except: roles = [u'admin', u'pi', u'user', u'tech']
-try: methods = config.api.GetNetworkMethods(auth)
+try: methods = GetNetworkMethods()
except: methods = [u'static', u'dhcp', u'proxy', u'tap', u'ipmi', u'unknown']
-try:types = config.api.GetNetworkTypes(auth)
+try: key_types = GetKeyTypes()
+except: key_types = [u'ssh']
+
+try:types = GetNetworkTypes()
except: types = [u'ipv4']
def randfloat(min = 0.0, max = 1.0):
slices = 4,
initscripts = 4,
):
- self.api = self.config.api
- self.auth = self.config.auth
- self.all_methods = set(self.api.system.listMethods())
+ self.all_methods = set(system.listMethods())
self.methods_tested = set()
self.methods_failed = set()
self.site_ids = self.Sites(sites)
#self.peer_ids = self.Peers(peers)
self.address_type_ids = self.AddressTypes(address_types)
- self.addresse_ids = self.Addresses(addresses)
+ self.address_ids = self.Addresses(addresses)
#self.conf_files = self.ConfFiles(conf_files)
#self.network_method_ids = self.NetworkMethods()
#self.network_type_ids = self.NetworkTypes()
#self.pcus_ids = self.PCUs(pcus)
#self.pcu_types_ids = self.PCUTypes(pcu_types)
#self.role_ids = self.Roles(roles)
- self.person_ids = self.Persons(persons)
- self.key_ids = self.Keys(keys)
#self.key_types = self.KeyTypes(key_types)
#self.slice_attribute_type_ids = self.SliceAttributeTypes(slice_attribute_types)
#self.slice_instantiation_ids = self.SliceInstantiations(slice_instantiations)
- #self.slice_ids = self.Slices(slices)
+ self.slice_ids = self.Slices(slices)
#self.slice_attribute_ids = self.SliceAttributes(slice_attributes)
#self.initscript_ids = self.InitScripts(initscripts)
-
+ self.person_ids = self.Persons(persons)
+ self.key_ids = self.Keys(keys)
# self.message_ids = self.Messages(messages)
#self.NotifPersons()
# Test GetEventObject only
def debug(self, method, method_name=None):
if method_name is None:
- method_name = method._Method__name
+ method_name = method.name
self.methods_tested.update([method_name])
def wrapper(*args, **kwds):
if hasattr(self, 'pcu_ids'): self.DeletePCUs()
if hasattr(self, 'pcu_protocol_type_ids'): self.DeleteProtocolTypes()
if hasattr(self, 'node_network_setting_ids'): self.DeleteNodeNetworkSettings()
- if hasattr(self, 'attress_ids'): self.DeleteAddresses()
+ if hasattr(self, 'address_ids'): self.DeleteAddresses()
if hasattr(self, 'attress_type_ids'): self.DeleteAddressTypes()
if hasattr(self, 'node_ids'): self.DeleteNodes()
if hasattr(self, 'site_ids'): self.DeleteSites()
for i in range(n):
# Add Site
site_fields = random_site()
- AddSite = self.debug(self.api.AddSite)
- site_id = AddSite(self.auth, site_fields)
+ AddSite = self.debug(shell.AddSite)
+ site_id = AddSite(site_fields)
if site_id is None: continue
# Should return a unique id
self.isunique(site_id, site_ids, 'AddSite - isunique')
site_ids.append(site_id)
- GetSites = self.debug(self.api.GetSites)
- sites = GetSites(self.auth, [site_id])
+ GetSites = self.debug(shell.GetSites)
+ sites = GetSites([site_id])
if sites is None: continue
site = sites[0]
self.isequal(site, site_fields, 'AddSite - isequal')
# Update site
site_fields = random_site()
- UpdateSite = self.debug(self.api.UpdateSite)
- result = UpdateSite(self.auth, site_id, site_fields)
+ UpdateSite = self.debug(shell.UpdateSite)
+ result = UpdateSite(site_id, site_fields)
# Check again
- sites = GetSites(self.auth, [site_id])
+ sites = GetSites([site_id])
if sites is None: continue
site = sites[0]
self.isequal(site, site_fields, 'UpdateSite - isequal')
- sites = GetSites(self.auth, site_ids)
+ sites = GetSites(site_ids)
if sites is not None:
self.islistequal(site_ids, [site['site_id'] for site in sites], 'GetSites - isequal')
def DeleteSites(self):
# Delete all sites
- DeleteSite = self.debug(self.api.DeleteSite)
+ DeleteSite = self.debug(shell.DeleteSite)
for site_id in self.site_ids:
- result = DeleteSite(self.auth, site_id)
+ result = DeleteSite(site_id)
# Check if sites are deleted
- GetSites = self.debug(self.api.GetSites)
- sites = GetSites(self.auth, self.site_ids)
+ GetSites = self.debug(shell.GetSites)
+ sites = GetSites(self.site_ids)
self.islistequal(sites, [], 'DeleteSite - check')
if self.config.verbose:
# Add Node
node_fields = random_node()
site_id = random.sample(self.site_ids, 1)[0]
- AddNode = self.debug(self.api.AddNode)
- node_id = AddNode(self.auth, site_id, node_fields)
+ AddNode = self.debug(shell.AddNode)
+ node_id = AddNode(site_id, node_fields)
if node_id is None: continue
# Should return a unique id
node_ids.append(node_id)
# Check nodes
- GetNodes = self.debug(self.api.GetNodes)
- nodes = GetNodes(self.auth, [node_id])
+ GetNodes = self.debug(shell.GetNodes)
+ nodes = GetNodes([node_id])
if nodes is None: continue
node = nodes[0]
self.isequal(node, node_fields, 'AddNode - isequal')
# Update node
node_fields = random_node()
- UpdateNode = self.debug(self.api.UpdateNode)
- result = UpdateNode(self.auth, node_id, node_fields)
+ UpdateNode = self.debug(shell.UpdateNode)
+ result = UpdateNode(node_id, node_fields)
# Check again
- nodes = GetNodes(self.auth, [node_id])
+ nodes = GetNodes([node_id])
if nodes is None: continue
node = nodes[0]
self.isequal(node, node_fields, 'UpdateNode - isequal')
- nodes = GetNodes(self.auth, node_ids)
+ nodes = GetNodes(node_ids)
if nodes is not None:
self.islistequal(node_ids, [node['node_id'] for node in nodes], 'GetNodes - isequal')
return node_ids
def DeleteNodes(self):
- DeleteNode = self.debug(self.api.DeleteNode)
+ DeleteNode = self.debug(shell.DeleteNode)
for node_id in self.node_ids:
- result = DeleteNode(self.auth, node_id)
+ result = DeleteNode(node_id)
# Check if nodes are deleted
- GetNodes = self.debug(self.api.GetNodes)
- nodes = GetNodes(self.api, self.node_ids)
+ GetNodes = self.debug(shell.GetNodes)
+ nodes = GetNodes(self.node_ids)
self.islistequal(nodes, [], 'DeleteNode Check')
if self.config.verbose:
address_type_ids = []
for i in range(n):
address_type_fields = random_address_type()
- AddAddressType = self.debug(self.api.AddAddressType)
- address_type_id = AddAddressType(self.auth, address_type_fields)
+ AddAddressType = self.debug(shell.AddAddressType)
+ address_type_id = AddAddressType(address_type_fields)
if address_type_id is None: continue
# Should return a unique address_type_id
address_type_ids.append(address_type_id)
# Check address type
- GetAddressTypes = self.debug(self.api.GetAddressTypes)
- address_types = GetAddressTypes(self.auth, [address_type_id])
+ GetAddressTypes = self.debug(shell.GetAddressTypes)
+ address_types = GetAddressTypes([address_type_id])
if address_types is None: continue
address_type = address_types[0]
self.isequal(address_type, address_type_fields, 'AddAddressType - isequal')
# Update address type
address_type_fields = random_address_type()
- UpdateAddressType = self.debug(self.api.UpdateAddressType)
- result = UpdateAddressType(self.auth, address_type_id, address_type_fields)
+ UpdateAddressType = self.debug(shell.UpdateAddressType)
+ result = UpdateAddressType(address_type_id, address_type_fields)
if result is None: continue
# Check address type again
- address_types = GetAddressTypes(self.auth, [address_type_id])
+ address_types = GetAddressTypes([address_type_id])
if address_types is None: continue
address_type = address_types[0]
self.isequal(address_type, address_type_fields, 'UpdateAddressType - isequal')
# Check get all address types
- address_types = GetAddressTypes(self.auth, address_type_ids)
+ address_types = GetAddressTypes(address_type_ids)
if address_types is not None:
self.islistequal(address_type_ids, [address_type['address_type_id'] for address_type in address_types], 'GetAddressTypes - isequal')
if self.config.verbose:
- print "Added address types", address_type_ids
+ utils.header("Added address types: %s " % address_type_ids)
return address_type_ids
def DeleteAddressTypes(self):
- DeleteAddressType = self.debug(self.api.DeleteAddressType)
+ DeleteAddressType = self.debug(shell.DeleteAddressType)
for address_type_id in self.address_type_ids:
- DeleteAddressType(auth, address_type_id)
+ DeleteAddressType(ddress_type_id)
- GetAddressTypes = self.debug(self.api.GetAddressTypes)
- address_types = GetAddressTypes(self.auth, self.address_type_ids)
+ GetAddressTypes = self.debug(shell.GetAddressTypes)
+ address_types = GetAddressTypes(self.address_type_ids)
self.islistequal(address_types, [], 'DeleteAddressType - check')
if self.config.verbose:
for i in range(n):
address_fields = random_address()
site_id = random.sample(self.site_ids, 1)[0]
- AddSiteAddress = self.debug(self.api.AddSiteAddress)
- address_id = AddSiteAddress(self.auth, site_id, address_fields)
+ AddSiteAddress = self.debug(shell.AddSiteAddress)
+ address_id = AddSiteAddress(site_id, address_fields)
if address_id is None: continue
# Should return a unique address_id
address_ids.append(address_id)
# Check address
- GetAddresses = self.debug(self.api.GetAddresses)
- addresses = GetAddresses(self.auth, [address_id])
+ GetAddresses = self.debug(shell.GetAddresses)
+ addresses = GetAddresses([address_id])
if addresses is None: continue
address = addresses[0]
self.isequal(address, address_fields, 'AddSiteAddress - isequal')
# Update address
address_fields = random_address()
- UpdateAddress = self.debug(self.api.UpdateAddress)
- result = UpdateAddress(self.auth, address_id, address_fields)
+ UpdateAddress = self.debug(shell.UpdateAddress)
+ result = UpdateAddress(address_id, address_fields)
# Check again
- addresses = GetAddresses(self.auth, [address_id])
+ addresses = GetAddresses([address_id])
if addresses is None: continue
address = addresses[0]
self.isequal(address, address_fields, 'UpdateAddress - isequal')
- addresses = GetAddress(self.auth, address_ids)
+ addresses = GetAddresses(address_ids)
if addresses is not None:
- slef.islistequal(address_ids, [ad['address_id'] for ad in addresses], 'GetAddresses - isequal')
+ self.islistequal(address_ids, [ad['address_id'] for ad in addresses], 'GetAddresses - isequal')
if self.config.verbose:
- utils.header("Added addresses: %s" % self.address_ids)
+ utils.header("Added addresses: %s" % address_ids)
return address_ids
def DeleteAddresses(self):
- DeleteAddress = self.debug(self.api.DeleteAddress)
+ DeleteAddress = self.debug(shell.DeleteAddress)
# Delete site addresses
for address_id in self.address_ids:
- result = DeleteAddress(self.auth, address_id)
+ result = DeleteAddress(address_id)
# Check
- GetAddresses = self.debug(self.api.GetAddresses)
- addresses = GetAddresses(self.api, self.address_ids)
+ GetAddresses = self.debug(shell.GetAddresses)
+ addresses = GetAddresses(self.address_ids)
self.islistequal(addresses, [], 'DeleteAddress - check')
- if self.verbose:
- print "Deleted addresses", self.address_ids
+ if self.config.verbose:
+ utils.header("Deleted addresses: %s" % self.address_ids)
self.address_ids = []
- def AddPersons(self, n = 3):
+ def Slices(self, n = 3):
+ slice_ids = []
+ for i in range(n):
+ # Add Site
+ slice_fields = random_slice()
+ AddSlice = self.debug(shell.Slice)
+ slice_id = AddSlice(slice_fields)
+ if slice_id is None: continue
+
+ # Should return a unique id
+ self.isunique(slice_id, slice_ids, 'AddSlicel - isunique')
+ slice_ids.append(slice_id)
+ GetSlices = self.debug(shell.GetSlices)
+ slices = GetSlices([slice_id])
+ if slices is None: continue
+ slice = slices[0]
+ self.isequal(slice, slice_fields, 'AddSlice - isequal')
+
+ # Update slice
+ slice_fields = random_slice()
+ UpdateSite = self.debug(shell.UpdateSlice)
+ result = UpdateSlice(slice_id, slice_fields)
+
+ # Check again
+ slices = GetSites([slice_id])
+ if slices is None: continue
+ slice = slices[0]
+ self.isequal(slice, slice_fields, 'UpdateSlice - isequal')
+
+ # XX Add attribute
+
+ # XX Add node
+
+ slices = GetSites(slice_ids)
+ if slices is not None:
+ self.islistequal(slice_ids, [slice['slice_id'] for slice in slices], 'GetSlices - isequal')
- roles = GetRoles()
- role_ids = [role['role_id'] for role in roles]
- roles = [role['name'] for role in roles]
- roles = dict(zip(roles, role_ids))
+ if self.config.verbose:
+ utils.header("Added slices: %s" % slice_ids)
- for i in range(n):
+ return slice_ids
- # Add account
- person_fields = random_person()
- person_id = AddPerson(person_fields)
+ def DeleteSlices(self):
+
+ # XX manually delete attributes for first slice
+ slices = GetSlices(self.slice_ids, ['slice_attribute_ids', 'node_ids'])
- # Should return a unique person_id
- assert person_id not in self.person_ids
- self.person_ids.append(person_id)
-
- if self.check:
- # Check account
- person = GetPersons([person_id])[0]
- for field in person_fields:
- if field != 'password':
- assert person[field] == person_fields[field]
-
- # Update account
- person_fields = random_person()
- UpdatePerson(person_id, person_fields)
-
- # Check account again
- person = GetPersons([person_id])[0]
- for field in person_fields:
- if field != 'password':
- assert person[field] == person_fields[field]
-
- auth = {'AuthMethod': "password",
- 'Username': person_fields['email'],
- 'AuthString': person_fields['password']}
-
- if self.check:
- # Check that account is disabled
- try:
- assert not AuthCheck(auth)
- except:
- pass
-
- # Add random set of roles
- person_roles = random.sample(['user', 'pi', 'tech'], randint(1, 3))
- for person_role in person_roles:
- role_id = roles[person_role]
- AddRoleToPerson(role_id, person_id)
-
- if self.check:
- person = GetPersons([person_id])[0]
- assert set(person_roles) == set(person['roles'])
-
- # Enable account
- UpdatePerson(person_id, {'enabled': True})
-
- if self.check:
- # Check that account is enabled
- assert AuthCheck(auth)
-
- # Associate account with random set of sites
- person_site_ids = []
- for site_id in random.sample(self.site_ids, randint(1, len(self.site_ids))):
- AddPersonToSite(person_id, site_id)
- person_site_ids.append(site_id)
-
- if self.check:
- # Make sure it really did it
- person = GetPersons([person_id])[0]
- assert set(person_site_ids) == set(person['site_ids'])
-
- # Set a primary site
- primary_site_id = random.sample(person_site_ids, randint(1, len(person_site_ids)))[0]
- SetPersonPrimarySite(person_id, primary_site_id)
-
- if self.check:
- person = GetPersons([person_id])[0]
- assert person['site_ids'][0] == primary_site_id
+
+ DeleteSlice = self.debug(shell.DeleteSlice)
+ # Have DeleteSlice automatically delete attriubtes for the rest
+ for slice_id in self.slice_ids:
+ # Delete account
+ DeleteSlice(slice_id)
+
+ # Check if persons are deleted
+ GetSlices = self.debug(shell.GetSlices)
+ slices = GetSlices(self.slice_ids)
+ self.islistequal(slices, [], 'DeleteSlice - check')
if self.verbose:
- print "Added users", self.person_ids
+ utils.header("Deleted slices: %s" % self.slice_ids)
- def DeletePersons(self):
- # Delete users
- for person_id in self.person_ids:
- # Remove from each site
- for site_id in self.site_ids:
- DeletePersonFromSite(person_id, site_id)
+ self.slice_ids = []
- if self.check:
- person = GetPersons([person_id])[0]
- assert not person['site_ids']
- # Revoke roles
- person = GetPersons([person_id])[0]
- for role_id in person['role_ids']:
- DeleteRoleFromPerson(role_id, person_id)
+ def Persons(self, n = 3):
- if self.check:
- person = GetPersons([person_id])[0]
- assert not person['role_ids']
+ person_ids = []
+ for i in range(n):
- # Disable account
- UpdatePerson(person_id, {'enabled': False})
+ # Add account
+ person_fields = random_person()
+ AddPerson = self.debug(shell.AddPerson)
+ person_id = AddPerson(person_fields)
+ if person_id is None: continue
+
+ # Should return a unique person_id
+ self.isunique(person_id, person_ids, 'AddPerson - isunique')
+ person_ids.append(person_id)
+ GetPersons = self.debug(shell.GetPersons)
+ persons = GetPersons([person_id])
+ if persons is None: continue
+ person = persons[0]
+ self.isequal(person, person_fields, 'AddPerson - isequal')
+
+ # Update account
+ person_fields = random_person()
+ person_fields['enabled'] = True
+ UpdatePerson = self.debug(shell.UpdatePerson)
+ result = UpdatePerson(person_id, person_fields)
+
+ # Add random role
+ AddRoleToPerson = self.debug(shell.AddRoleToPerson)
+ role = random.sample(roles, 1)[0]
+ result = AddRoleToPerson(role, person_id)
+
+ # Add key to person
+ key = random_key()
+ key_id = AddPersonKey = self.debug(shell.AddPersonKey)
+ AddPersonKey(person_id, key)
+
+ # Add person to site
+ site_id = random.sample(self.site_ids, 1)[0]
+ AddPersonToSite = self.debug(shell.AddPersonToSite)
+ AddPersonToSite(person_id, site_id)
+
+ # Add person to slice
+ slice_id = random.sample(self.slice_ids, 1)[0]
+ AddPersonToSlice = self.debug(self.AddPersonToSlice)
+ AddPersonToSlice(person_id, slice_id)
+
+ # check role, key, site, slice
+ persons = GetPersons([person_id], ['roles', 'key_ids', 'site_ids', 'slice_ids'])
+ if persons is None or not persons: continue
+ person = persons[0]
+ self.islistequal([role], person['roles'], 'AddRoleToPerson - check')
+ self.islistequal([key_id], person['key_ids'], 'AddPersonKey - check')
+ self.islistequal([site_id], person['site_ids'], 'AddPersonToSite - check')
+ self.islistequal([slice_id], person['slice_ids'], 'AddPersonToSlice - check')
+
+ persons = GetPersons(person_ids)
+ if persons is not None:
+ self.islistequal(person_ids, [p['person_id'] for p in persons], 'GetPersons - isequal')
- if self.check:
- person = GetPersons([person_id])[0]
- assert not person['enabled']
+ if self.verbose:
+ utils.header("Added users: %s" % self.person_ids)
+ def DeletePersons(self):
+
+ # Delete attributes manually for first person
+ persons = GetPersons(self.person_ids, ['person_id' , 'key_ids', 'site_ids', 'slice_ids'])
+ if persons is None or not persons: return 0
+ person = persons[0]
+
+ # Delete role
+ DeleteRoleFromPerson = self.debug(shell.DeleteRoleFromPerson)
+ DeleteRoleFromPerson(person['role'], person['person_id'])
+
+ # Delete key
+ DeleteKey = self.debug(shell.DeleteKey)
+ DeleteKey(person['key_id'])
+
+ # Remove person from site
+ DeletePersonFromSite = self.debug(shell.DeletePersonFromSite)
+ DeletePersonFromSite(person['person_id'], person['site_id'])
+
+ # Remove person from slice
+ DeletePersonFromSlice = self.debug(shell.DeletePersonFromSlice)
+ DeletePersonFromSlice(person['person_id'], person['slice_id'])
+
+ # check role, key, site, slice
+ persons = GetPersons([person['person_id']], ['roles', 'key_ids', 'site_ids', 'slice_ids'])
+ if persons is None or not persons: return 0
+ person = persons[0]
+ self.islistequal([], person['roles'], 'DeleteRoleFromPerson - check')
+ self.islistequal([], person['key_ids'], 'DeleteKey - check')
+ self.islistequal([], person['site_ids'], 'DeletePersonFromSite - check')
+ self.islistequal([], person['slice_ids'], 'DeletePersonFromSlice - check')
+
+ DeletePerson = self.debug(shell.DeletePerson)
+ # Have DeletePeson automatically delete attriubtes for all other persons
+ for person_id in self.person_ids:
# Delete account
DeletePerson(person_id)
- if self.check:
- assert not GetPersons([person_id])
-
- if self.check:
- assert not GetPersons(self.person_ids)
-
- if self.verbose:
- print "Deleted users", self.person_ids
+ # Check if persons are deleted
+ GetPersons = self.debug(shell.GetPersons)
+ persons = GetPersons(self.person_ids)
+ self.islistequal(persons, [], 'DeletePerson - check')
+
+ if self.verbose:
+ utils.header("Deleted users: %s" % self.person_ids)
self.person_ids = []