5 # Copyright (C) 2006 The Trustees of Princeton University
9 from pprint import pprint
10 from string import letters, digits, punctuation
11 from traceback import print_exc
20 from qa.Config import Config
21 from qa.logger import Logfile, log
22 from random import Random
29 try: boot_states = config.api.GetBootStates(auth)
30 except: boot_states = [u'boot', u'dbg', u'inst', u'new', u'rcnf', u'rins']
32 try: roles = [role['name'] for role in config.api.GetRoles(auth)]
33 except: roles = [u'admin', u'pi', u'user', u'tech']
35 try: methods = config.api.GetNetworkMethods(auth)
36 except: methods = [u'static', u'dhcp', u'proxy', u'tap', u'ipmi', u'unknown']
38 try:types = config.api.GetNetworkTypes(auth)
39 except: types = [u'ipv4']
41 def randfloat(min = 0.0, max = 1.0):
42 return float(min) + (random.random() * (float(max) - float(min)))
44 def randint(min = 0, max = 1):
45 return int(randfloat(min, max + 1))
47 # See "2.2 Characters" in the XML specification:
49 # #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD]
51 # [#x7F-#x84], [#x86-#x9F], [#xFDD0-#xFDDF]
53 ascii_xml_chars = map(unichr, [0x9, 0xA, 0xD])
54 ascii_xml_chars += map(unichr, xrange(0x20, 0x7F - 1))
55 low_xml_chars = list(ascii_xml_chars)
56 low_xml_chars += map(unichr, xrange(0x84 + 1, 0x86 - 1))
57 low_xml_chars += map(unichr, xrange(0x9F + 1, 0xFF))
58 valid_xml_chars = list(low_xml_chars)
59 valid_xml_chars += map(unichr, xrange(0xFF + 1, 0xD7FF))
60 valid_xml_chars += map(unichr, xrange(0xE000, 0xFDD0 - 1))
61 valid_xml_chars += map(unichr, xrange(0xFDDF + 1, 0xFFFD))
63 def randstr(length, pool = valid_xml_chars, encoding = "utf-8"):
64 sample = random.sample(pool, min(length, len(pool)))
67 bytes = len(s.encode(encoding))
71 sample += random.sample(pool, min(length - bytes, len(pool)))
72 random.shuffle(sample)
78 # 1. Each part begins and ends with a letter or number.
79 # 2. Each part except the last can contain letters, numbers, or hyphens.
80 # 3. Each part is between 1 and 64 characters, including the trailing dot.
81 # 4. At least two parts.
82 # 5. Last part can only contain between 2 and 6 letters.
83 hostname = 'a' + randstr(61, letters + digits + '-') + '1.' + \
84 'b' + randstr(61, letters + digits + '-') + '2.' + \
85 'c' + randstr(5, letters)
90 for i in range(randint(1, 10)):
91 parts.append(randstr(randint(1, 30), ascii_xml_chars))
92 return os.sep.join(parts)[0:length]
95 return (randstr(100, letters + digits) + "@" + randhostname()).lower()
97 def randkey(bits = 2048):
98 key_types = ["ssh-dss", "ssh-rsa"]
99 key_type = random.sample(key_types, 1)[0]
100 return ' '.join([key_type,
101 base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))),
106 'name': randstr(254),
107 'abbreviated_name': randstr(50),
108 'login_base': randstr(20, letters).lower(),
109 'latitude': int(randfloat(-90.0, 90.0) * 1000) / 1000.0,
110 'longitude': int(randfloat(-180.0, 180.0) * 1000) / 1000.0,
113 def random_address_type():
116 'description': randstr(254),
119 def random_address():
121 'line1': randstr(254),
122 'line2': randstr(254),
123 'line3': randstr(254),
124 'city': randstr(254),
125 'state': randstr(254),
126 'postalcode': randstr(64),
127 'country': randstr(128),
132 'first_name': randstr(128),
133 'last_name': randstr(128),
134 'email': randemail(),
136 # Accounts are disabled by default
138 'password': randstr(254),
143 'key_type': random.sample(key_types, 1)[0],
149 'name': site['login_base'] + "_" + randstr(11, letters).lower(),
150 'url': "http://" + randhostname() + "/",
151 'description': randstr(2048),
154 def random_nodegroup():
157 'description': randstr(200),
162 'hostname': randhostname(),
163 'boot_state': random.sample(boot_states, 1)[0],
164 'model': randstr(255),
165 'version': randstr(64),
168 def random_nodenetwork():
169 nodenetwork_fields = {
170 'method': random.sample(methods, 1)[0],
171 'type': random.sample(types, 1)[0],
172 'bwlimit': randint(500000, 10000000),
176 ip = randint(0, 0xffffffff)
177 netmask = (0xffffffff << randint(2, 31)) & 0xffffffff
178 network = ip & netmask
179 broadcast = ((ip & netmask) | ~netmask) & 0xffffffff
180 gateway = randint(network + 1, broadcast - 1)
181 dns1 = randint(0, 0xffffffff)
183 for field in 'ip', 'netmask', 'network', 'broadcast', 'gateway', 'dns1':
184 nodenetwork_fields[field] = socket.inet_ntoa(struct.pack('>L', locals()[field]))
186 return nodenetwork_fields
190 'hostname': randhostname(),
191 'ip': socket.inet_ntoa(struct.pack('>L', randint(0, 0xffffffff))),
192 'protocol': randstr(16),
193 'username': randstr(254),
194 'password': randstr(254),
195 'notes': randstr(254),
196 'model': randstr(32),
199 def random_conf_file():
201 'enabled': bool(randint()),
202 'source': randpath(255),
203 'dest': randpath(255),
204 'file_permissions': "%#o" % randint(0, 512),
205 'file_owner': randstr(32, letters + '_' + digits),
206 'file_group': randstr(32, letters + '_' + digits),
207 'preinstall_cmd': randpath(100),
208 'postinstall_cmd': randpath(100),
209 'error_cmd': randpath(100),
210 'ignore_cmd_errors': bool(randint()),
211 'always_update': bool(randint()),
214 def random_attribute_type():
216 'name': randstr(100),
217 'description': randstr(254),
218 'min_role_id': random.sample(roles.values(), 1)[0],
221 def isequal(object_fields, expected_fields):
223 for field in expected_fields:
224 assert field in object_fields
225 assert object_fields[field] == expected_fields[field]
230 def islistequal(list1, list2):
232 assert set(list1) == set(list2)
237 def isunique(id, id_list):
239 assert id not in id_list
244 class api_unit_test(Test):
259 self.api = self.config.api
260 self.auth = self.config.auth
261 self.all_methods = set(self.api.system.listMethods())
262 self.methods_tested = set()
263 self.methods_failed = set()
265 # Begin testing methods
268 #self.boot_state_ids = self.BootStates(boot_states)
269 self.site_ids = self.Sites(sites)
270 #self.peer_ids = self.Peers(peers)
271 self.address_type_ids = self.AddressTypes(address_types)
272 self.addresse_ids = self.Addresses(addresses)
273 #self.conf_files = self.ConfFiles(conf_files)
274 #self.network_method_ids = self.NetworkMethods()
275 #self.network_type_ids = self.NetworkTypes()
276 #self.nodegroup_ids = self.NodeGroups()
277 self.node_ids = self.Nodes(nodes)
278 #self.node_network_ids = self.NodeNetworks(node_networks)
279 #self.node_network_setting_type_ids = self.NodeNetworkSettingsTypes(node_network_settings_types)
280 #self.node_network_setting_ids = self.NodeNetworkSettings(node_network_settings)
281 #self.pcu_protocol_types_ids = self.PCUProtocolTypes(pcu_protocol_types)
282 #self.pcus_ids = self.PCUs(pcus)
283 #self.pcu_types_ids = self.PCUTypes(pcu_types)
284 #self.role_ids = self.Roles(roles)
285 self.person_ids = self.Persons(persons)
286 self.key_ids = self.Keys(keys)
287 #self.key_types = self.KeyTypes(key_types)
288 #self.slice_attribute_type_ids = self.SliceAttributeTypes(slice_attribute_types)
289 #self.slice_instantiation_ids = self.SliceInstantiations(slice_instantiations)
290 #self.slice_ids = self.Slices(slices)
291 #self.slice_attribute_ids = self.SliceAttributes(slice_attributes)
292 #self.initscript_ids = self.InitScripts(initscripts)
294 # self.message_ids = self.Messages(messages)
296 # Test GetEventObject only
297 #self.event_object_ids = self.GetEventObjects()
298 #self.event_ids = self.GetEvents()
306 logfile = Logfile("api-unittest.log")
307 methods_ok = list(self.methods_tested.difference(self.methods_failed))
308 methods_failed = list(self.methods_failed)
309 methods_untested = list(self.all_methods.difference(self.methods_tested))
311 methods_failed.sort()
312 methods_untested.sort()
313 print >> logfile, "\n".join([m+": [OK]" for m in methods_ok])
314 print >> logfile, "\n".join([m+": [FAILED]" for m in methods_failed])
315 print >> logfile, "\n".join([m+": [Not Tested]" for m in methods_untested])
317 def isequal(self, object_fields, expected_fields, method_name):
319 for field in expected_fields:
320 assert field in object_fields
321 assert object_fields[field] == expected_fields[field]
323 self.methods_failed.update([method_name])
327 def islistequal(self, list1, list2, method_name):
328 try: assert set(list1) == set(list2)
330 self.methods_failed.update([method_name])
334 def isunique(self, id, id_list, method_name):
335 try: assert id not in id_list
337 self.methods_failed.update([method_name])
341 def debug(self, method, method_name=None):
342 if method_name is None:
343 method_name = method._Method__name
345 self.methods_tested.update([method_name])
346 def wrapper(*args, **kwds):
348 return method(*args, **kwds)
350 self.methods_failed.update([method_name])
356 if hasattr(self, 'initscript_ids'): self.DeleteInitScripts()
357 if hasattr(self, 'slice_attribute_ids'): self.DeleteSliceAttributes()
358 if hasattr(self, 'slice_ids'): self.DeleteSlices()
359 if hasattr(self, 'slice_instantiation_ids'): self.DeleteSliceInstantiations()
360 if hasattr(self, 'slice_attribute_type_ids'): self.DeleteSliceAttributeTypes()
361 if hasattr(self, 'slice_attribute_ids'): self.DeleteSliceAttributes()
362 if hasattr(self, 'key_type_ids'): self.DeleteKeyTypes()
363 if hasattr(self, 'key_ids'): self.DeleteKeys()
364 if hasattr(self, 'person_ids'): self.DeletePersons()
365 if hasattr(self, 'role_ids'): self.DeleteRoles()
366 if hasattr(self, 'pcu_type_ids'): self.DeletePCUTypes()
367 if hasattr(self, 'pcu_ids'): self.DeletePCUs()
368 if hasattr(self, 'pcu_protocol_type_ids'): self.DeleteProtocolTypes()
369 if hasattr(self, 'node_network_setting_ids'): self.DeleteNodeNetworkSettings()
370 if hasattr(self, 'attress_ids'): self.DeleteAddresses()
371 if hasattr(self, 'attress_type_ids'): self.DeleteAddressTypes()
372 if hasattr(self, 'node_ids'): self.DeleteNodes()
373 if hasattr(self, 'site_ids'): self.DeleteSites()
376 def Sites(self, n=4):
380 site_fields = random_site()
381 AddSite = self.debug(self.api.AddSite)
382 site_id = AddSite(self.auth, site_fields)
383 if site_id is None: continue
385 # Should return a unique id
386 self.isunique(site_id, site_ids, 'AddSite - isunique')
387 site_ids.append(site_id)
388 GetSites = self.debug(self.api.GetSites)
389 sites = GetSites(self.auth, [site_id])
390 if sites is None: continue
392 self.isequal(site, site_fields, 'AddSite - isequal')
395 site_fields = random_site()
396 UpdateSite = self.debug(self.api.UpdateSite)
397 result = UpdateSite(self.auth, site_id, site_fields)
400 sites = GetSites(self.auth, [site_id])
401 if sites is None: continue
403 self.isequal(site, site_fields, 'UpdateSite - isequal')
405 sites = GetSites(self.auth, site_ids)
406 if sites is not None:
407 self.islistequal(site_ids, [site['site_id'] for site in sites], 'GetSites - isequal')
409 if self.config.verbose:
410 utils.header("Added sites: %s" % site_ids)
415 def DeleteSites(self):
417 DeleteSite = self.debug(self.api.DeleteSite)
418 for site_id in self.site_ids:
419 result = DeleteSite(self.auth, site_id)
421 # Check if sites are deleted
422 GetSites = self.debug(self.api.GetSites)
423 sites = GetSites(self.auth, self.site_ids)
424 self.islistequal(sites, [], 'DeleteSite - check')
426 if self.config.verbose:
427 utils.header("Deleted sites: %s" % self.site_ids)
431 def Nodes(self, n=4):
435 node_fields = random_node()
436 site_id = random.sample(self.site_ids, 1)[0]
437 AddNode = self.debug(self.api.AddNode)
438 node_id = AddNode(self.auth, site_id, node_fields)
439 if node_id is None: continue
441 # Should return a unique id
442 self.isunique(node_id, node_ids, 'AddNode - isunique')
443 node_ids.append(node_id)
446 GetNodes = self.debug(self.api.GetNodes)
447 nodes = GetNodes(self.auth, [node_id])
448 if nodes is None: continue
450 self.isequal(node, node_fields, 'AddNode - isequal')
453 node_fields = random_node()
454 UpdateNode = self.debug(self.api.UpdateNode)
455 result = UpdateNode(self.auth, node_id, node_fields)
458 nodes = GetNodes(self.auth, [node_id])
459 if nodes is None: continue
461 self.isequal(node, node_fields, 'UpdateNode - isequal')
463 nodes = GetNodes(self.auth, node_ids)
464 if nodes is not None:
465 self.islistequal(node_ids, [node['node_id'] for node in nodes], 'GetNodes - isequal')
467 if self.config.verbose:
468 utils.header("Added nodes: %s" % node_ids)
472 def DeleteNodes(self):
473 DeleteNode = self.debug(self.api.DeleteNode)
474 for node_id in self.node_ids:
475 result = DeleteNode(self.auth, node_id)
477 # Check if nodes are deleted
478 GetNodes = self.debug(self.api.GetNodes)
479 nodes = GetNodes(self.api, self.node_ids)
480 self.islistequal(nodes, [], 'DeleteNode Check')
482 if self.config.verbose:
483 utils.header("Deleted nodes: %s" % self.node_ids)
487 def AddressTypes(self, n = 3):
488 address_type_ids = []
490 address_type_fields = random_address_type()
491 AddAddressType = self.debug(self.api.AddAddressType)
492 address_type_id = AddAddressType(self.auth, address_type_fields)
493 if address_type_id is None: continue
495 # Should return a unique address_type_id
496 self.isunique(address_type_id, address_type_ids, 'AddAddressType - isunique')
497 address_type_ids.append(address_type_id)
500 GetAddressTypes = self.debug(self.api.GetAddressTypes)
501 address_types = GetAddressTypes(self.auth, [address_type_id])
502 if address_types is None: continue
503 address_type = address_types[0]
504 self.isequal(address_type, address_type_fields, 'AddAddressType - isequal')
506 # Update address type
507 address_type_fields = random_address_type()
508 UpdateAddressType = self.debug(self.api.UpdateAddressType)
509 result = UpdateAddressType(self.auth, address_type_id, address_type_fields)
510 if result is None: continue
512 # Check address type again
513 address_types = GetAddressTypes(self.auth, [address_type_id])
514 if address_types is None: continue
515 address_type = address_types[0]
516 self.isequal(address_type, address_type_fields, 'UpdateAddressType - isequal')
518 # Check get all address types
519 address_types = GetAddressTypes(self.auth, address_type_ids)
520 if address_types is not None:
521 self.islistequal(address_type_ids, [address_type['address_type_id'] for address_type in address_types], 'GetAddressTypes - isequal')
523 if self.config.verbose:
524 print "Added address types", address_type_ids
526 return address_type_ids
528 def DeleteAddressTypes(self):
530 DeleteAddressType = self.debug(self.api.DeleteAddressType)
531 for address_type_id in self.address_type_ids:
532 DeleteAddressType(auth, address_type_id)
534 GetAddressTypes = self.debug(self.api.GetAddressTypes)
535 address_types = GetAddressTypes(self.auth, self.address_type_ids)
536 self.islistequal(address_types, [], 'DeleteAddressType - check')
538 if self.config.verbose:
539 utils.header("Deleted address types: " % self.address_type_ids)
541 self.address_type_ids = []
543 def Addresses(self, n = 3):
546 address_fields = random_address()
547 site_id = random.sample(self.site_ids, 1)[0]
548 AddSiteAddress = self.debug(self.api.AddSiteAddress)
549 address_id = AddSiteAddress(self.auth, site_id, address_fields)
550 if address_id is None: continue
552 # Should return a unique address_id
553 self.isunique(address_id, address_ids, 'AddSiteAddress - isunique')
554 address_ids.append(address_id)
557 GetAddresses = self.debug(self.api.GetAddresses)
558 addresses = GetAddresses(self.auth, [address_id])
559 if addresses is None: continue
560 address = addresses[0]
561 self.isequal(address, address_fields, 'AddSiteAddress - isequal')
564 address_fields = random_address()
565 UpdateAddress = self.debug(self.api.UpdateAddress)
566 result = UpdateAddress(self.auth, address_id, address_fields)
569 addresses = GetAddresses(self.auth, [address_id])
570 if addresses is None: continue
571 address = addresses[0]
572 self.isequal(address, address_fields, 'UpdateAddress - isequal')
574 addresses = GetAddress(self.auth, address_ids)
575 if addresses is not None:
576 slef.islistequal(address_ids, [ad['address_id'] for ad in addresses], 'GetAddresses - isequal')
578 if self.config.verbose:
579 utils.header("Added addresses: %s" % self.address_ids)
583 def DeleteAddresses(self):
585 DeleteAddress = self.debug(self.api.DeleteAddress)
586 # Delete site addresses
587 for address_id in self.address_ids:
588 result = DeleteAddress(self.auth, address_id)
591 GetAddresses = self.debug(self.api.GetAddresses)
592 addresses = GetAddresses(self.api, self.address_ids)
593 self.islistequal(addresses, [], 'DeleteAddress - check')
595 print "Deleted addresses", self.address_ids
597 self.address_ids = []
599 def AddPersons(self, n = 3):
602 role_ids = [role['role_id'] for role in roles]
603 roles = [role['name'] for role in roles]
604 roles = dict(zip(roles, role_ids))
609 person_fields = random_person()
610 person_id = AddPerson(person_fields)
612 # Should return a unique person_id
613 assert person_id not in self.person_ids
614 self.person_ids.append(person_id)
618 person = GetPersons([person_id])[0]
619 for field in person_fields:
620 if field != 'password':
621 assert person[field] == person_fields[field]
624 person_fields = random_person()
625 UpdatePerson(person_id, person_fields)
627 # Check account again
628 person = GetPersons([person_id])[0]
629 for field in person_fields:
630 if field != 'password':
631 assert person[field] == person_fields[field]
633 auth = {'AuthMethod': "password",
634 'Username': person_fields['email'],
635 'AuthString': person_fields['password']}
638 # Check that account is disabled
640 assert not AuthCheck(auth)
644 # Add random set of roles
645 person_roles = random.sample(['user', 'pi', 'tech'], randint(1, 3))
646 for person_role in person_roles:
647 role_id = roles[person_role]
648 AddRoleToPerson(role_id, person_id)
651 person = GetPersons([person_id])[0]
652 assert set(person_roles) == set(person['roles'])
655 UpdatePerson(person_id, {'enabled': True})
658 # Check that account is enabled
659 assert AuthCheck(auth)
661 # Associate account with random set of sites
663 for site_id in random.sample(self.site_ids, randint(1, len(self.site_ids))):
664 AddPersonToSite(person_id, site_id)
665 person_site_ids.append(site_id)
668 # Make sure it really did it
669 person = GetPersons([person_id])[0]
670 assert set(person_site_ids) == set(person['site_ids'])
673 primary_site_id = random.sample(person_site_ids, randint(1, len(person_site_ids)))[0]
674 SetPersonPrimarySite(person_id, primary_site_id)
677 person = GetPersons([person_id])[0]
678 assert person['site_ids'][0] == primary_site_id
681 print "Added users", self.person_ids
683 def DeletePersons(self):
685 for person_id in self.person_ids:
686 # Remove from each site
687 for site_id in self.site_ids:
688 DeletePersonFromSite(person_id, site_id)
691 person = GetPersons([person_id])[0]
692 assert not person['site_ids']
695 person = GetPersons([person_id])[0]
696 for role_id in person['role_ids']:
697 DeleteRoleFromPerson(role_id, person_id)
700 person = GetPersons([person_id])[0]
701 assert not person['role_ids']
704 UpdatePerson(person_id, {'enabled': False})
707 person = GetPersons([person_id])[0]
708 assert not person['enabled']
711 DeletePerson(person_id)
714 assert not GetPersons([person_id])
717 assert not GetPersons(self.person_ids)
720 print "Deleted users", self.person_ids
725 if __name__ == '__main__':
726 args = tuple(sys.argv[1:])
727 api_unit_test()(*args)