1 #!/usr/bin/env /usr/share/plc_api/plcsh
5 # Copyright (C) 2006 The Trustees of Princeton University
9 from pprint import pprint
10 from string import letters, digits, punctuation
12 from traceback import print_exc
21 from qa.Config import Config
22 from qa.logger import Logfile, log
23 from random import Random
31 try: boot_states = api.GetBootStates(auth)
32 except: boot_states = [u'boot', u'dbg', u'inst', u'new', u'rcnf', u'rins']
34 try: roles = [role['name'] for role in api.GetRoles(auth)]
35 except: roles = [u'admin', u'pi', u'user', u'tech']
37 try: methods = api.GetNetworkMethods(auth)
38 except: methods = [u'static', u'dhcp', u'proxy', u'tap', u'ipmi', u'unknown']
40 try: key_types = api.GetKeyTypes(auth)
41 except: key_types = [u'ssh']
43 try:types = api.GetNetworkTypes(auth)
44 except: types = [u'ipv4']
47 sites = api.GetSites(auth, None, ['login_base'])
48 login_bases = [site['login_base'] for site in sites]
53 def randfloat(min = 0.0, max = 1.0):
54 return float(min) + (random.random() * (float(max) - float(min)))
56 def randint(min = 0, max = 1):
57 return int(randfloat(min, max + 1))
59 # See "2.2 Characters" in the XML specification:
61 # #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD]
63 # [#x7F-#x84], [#x86-#x9F], [#xFDD0-#xFDDF]
65 ascii_xml_chars = map(unichr, [0x9, 0xA, 0xD])
66 ascii_xml_chars += map(unichr, xrange(0x20, 0x7F - 1))
67 low_xml_chars = list(ascii_xml_chars)
68 low_xml_chars += map(unichr, xrange(0x84 + 1, 0x86 - 1))
69 low_xml_chars += map(unichr, xrange(0x9F + 1, 0xFF))
70 valid_xml_chars = list(low_xml_chars)
71 valid_xml_chars += map(unichr, xrange(0xFF + 1, 0xD7FF))
72 valid_xml_chars += map(unichr, xrange(0xE000, 0xFDD0 - 1))
73 valid_xml_chars += map(unichr, xrange(0xFDDF + 1, 0xFFFD))
75 def randstr(length, pool = valid_xml_chars, encoding = "utf-8"):
76 sample = random.sample(pool, min(length, len(pool)))
79 bytes = len(s.encode(encoding))
83 sample += random.sample(pool, min(length - bytes, len(pool)))
84 random.shuffle(sample)
90 # 1. Each part begins and ends with a letter or number.
91 # 2. Each part except the last can contain letters, numbers, or hyphens.
92 # 3. Each part is between 1 and 64 characters, including the trailing dot.
93 # 4. At least two parts.
94 # 5. Last part can only contain between 2 and 6 letters.
95 hostname = 'a' + randstr(61, letters + digits + '-') + '1.' + \
96 'b' + randstr(61, letters + digits + '-') + '2.' + \
97 'c' + randstr(5, letters)
100 def randpath(length):
102 for i in range(randint(1, 10)):
103 parts.append(randstr(randint(1, 30), ascii_xml_chars))
104 return os.sep.join(parts)[0:length]
107 return (randstr(100, letters + digits) + "@" + randhostname()).lower()
109 def randkey(bits = 2048):
110 key_types = ["ssh-dss", "ssh-rsa"]
111 key_type = random.sample(key_types, 1)[0]
112 return ' '.join([key_type,
113 base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))),
118 'name': randstr(254),
119 'abbreviated_name': randstr(50),
120 'login_base': randstr(20, letters).lower(),
121 'latitude': int(randfloat(-90.0, 90.0) * 1000) / 1000.0,
122 'longitude': int(randfloat(-180.0, 180.0) * 1000) / 1000.0,
126 def random_address_type():
129 'description': randstr(254),
132 def random_address():
134 'line1': randstr(254),
135 'line2': randstr(254),
136 'line3': randstr(254),
137 'city': randstr(254),
138 'state': randstr(254),
139 'postalcode': randstr(64),
140 'country': randstr(128),
145 'first_name': randstr(128),
146 'last_name': randstr(128),
147 'email': randemail(),
149 # Accounts are disabled by default
151 'password': randstr(254),
156 'key_type': random.sample(key_types, 1)[0],
162 'name': random.sample(login_bases, 1)[0] + "_" + randstr(11, letters).lower(),
163 'url': "http://" + randhostname() + "/",
164 'description': randstr(2048),
167 def random_nodegroup():
170 'description': randstr(200),
175 'hostname': randhostname(),
176 'boot_state': random.sample(boot_states, 1)[0],
177 'model': randstr(255),
178 'version': randstr(64),
181 def random_nodenetwork():
182 nodenetwork_fields = {
183 'method': random.sample(methods, 1)[0],
184 'type': random.sample(types, 1)[0],
185 'bwlimit': randint(500000, 10000000),
189 ip = randint(0, 0xffffffff)
190 netmask = (0xffffffff << randint(2, 31)) & 0xffffffff
191 network = ip & netmask
192 broadcast = ((ip & netmask) | ~netmask) & 0xffffffff
193 gateway = randint(network + 1, broadcast - 1)
194 dns1 = randint(0, 0xffffffff)
196 for field in 'ip', 'netmask', 'network', 'broadcast', 'gateway', 'dns1':
197 nodenetwork_fields[field] = socket.inet_ntoa(struct.pack('>L', locals()[field]))
199 return nodenetwork_fields
203 'hostname': randhostname(),
204 'ip': socket.inet_ntoa(struct.pack('>L', randint(0, 0xffffffff))),
205 'protocol': randstr(16),
206 'username': randstr(254),
207 'password': randstr(254),
208 'notes': randstr(254),
209 'model': randstr(32),
212 def random_conf_file():
214 'enabled': bool(randint()),
215 'source': randpath(255),
216 'dest': randpath(255),
217 'file_permissions': "%#o" % randint(0, 512),
218 'file_owner': randstr(32, letters + '_' + digits),
219 'file_group': randstr(32, letters + '_' + digits),
220 'preinstall_cmd': randpath(100),
221 'postinstall_cmd': randpath(100),
222 'error_cmd': randpath(100),
223 'ignore_cmd_errors': bool(randint()),
224 'always_update': bool(randint()),
227 def random_attribute_type():
229 'name': randstr(100),
230 'description': randstr(254),
231 'min_role_id': random.sample(roles.values(), 1)[0],
234 def isequal(object_fields, expected_fields):
236 for field in expected_fields:
237 assert field in object_fields
238 assert object_fields[field] == expected_fields[field]
243 def islistequal(list1, list2):
245 assert set(list1) == set(list2)
250 def isunique(id, id_list):
252 assert id not in id_list
257 class api_unit_test(Test):
259 error_log = Logfile('api-unittest-error.log')
274 self.all_methods = set(api.system.listMethods())
275 self.methods_tested = set()
276 self.methods_failed = set()
278 # Begin testing methods
281 #self.boot_state_ids = self.BootStates(boot_states)
282 self.site_ids = self.Sites(sites)
283 #self.peer_ids = self.Peers(peers)
284 self.address_type_ids = self.AddressTypes(address_types)
285 self.address_ids = self.Addresses(addresses)
286 #self.conf_files = self.ConfFiles(conf_files)
287 #self.network_method_ids = self.NetworkMethods()
288 #self.network_type_ids = self.NetworkTypes()
289 #self.nodegroup_ids = self.NodeGroups()
290 self.node_ids = self.Nodes(nodes)
291 #self.node_network_ids = self.NodeNetworks(node_networks)
292 #self.node_network_setting_type_ids = self.NodeNetworkSettingsTypes(node_network_settings_types)
293 #self.node_network_setting_ids = self.NodeNetworkSettings(node_network_settings)
294 #self.pcu_protocol_types_ids = self.PCUProtocolTypes(pcu_protocol_types)
295 #self.pcus_ids = self.PCUs(pcus)
296 #self.pcu_types_ids = self.PCUTypes(pcu_types)
297 #self.role_ids = self.Roles(roles)
298 #self.key_types = self.KeyTypes(key_types)
299 #self.slice_attribute_type_ids = self.SliceAttributeTypes(slice_attribute_types)
300 #self.slice_instantiation_ids = self.SliceInstantiations(slice_instantiations)
301 self.slice_ids = self.Slices(slices)
302 #self.slice_attribute_ids = self.SliceAttributes(slice_attributes)
303 #self.initscript_ids = self.InitScripts(initscripts)
304 self.person_ids = self.Persons(persons)
305 self.key_ids = self.Keys(keys)
306 # self.message_ids = self.Messages(messages)
308 # Test GetEventObject only
309 #self.event_object_ids = self.GetEventObjects()
310 #self.event_ids = self.GetEvents()
317 utils.header("writing api_unitest.log")
318 logfile = Logfile("api-unittest-summary.log")
319 methods_ok = list(self.methods_tested.difference(self.methods_failed))
320 methods_failed = list(self.methods_failed)
321 methods_untested = list(self.all_methods.difference(self.methods_tested))
323 methods_failed.sort()
324 methods_untested.sort()
325 print >> logfile, "\n".join([m+": [OK]" for m in methods_ok])
326 print >> logfile, "\n".join([m+": [FAILED]" for m in methods_failed])
327 print >> logfile, "\n".join([m+": [Not Tested]" for m in methods_untested])
329 def isequal(self, object_fields, expected_fields, method_name):
331 for field in expected_fields:
332 assert field in object_fields
333 assert object_fields[field] == expected_fields[field]
335 self.methods_failed.update([method_name])
339 def islistequal(self, list1, list2, method_name):
340 try: assert set(list1) == set(list2)
342 self.methods_failed.update([method_name])
346 def isunique(self, id, id_list, method_name):
347 try: assert id not in id_list
349 self.methods_failed.update([method_name])
353 def debug(self, method, method_name=None):
354 if method_name is None:
355 method_name = method.name
357 self.methods_tested.update([method_name])
358 def wrapper(*args, **kwds):
360 return method(*args, **kwds)
362 self.methods_failed.update([method_name])
363 print >> self.error_log, "%s: %s\n" % (method_name, traceback.format_exc())
369 if hasattr(self, 'key_type_ids'): self.DeleteKeyTypes()
370 if hasattr(self, 'key_ids'): self.DeleteKeys()
371 if hasattr(self, 'person_ids'): self.DeletePersons()
372 if hasattr(self, 'initscript_ids'): self.DeleteInitScripts()
373 if hasattr(self, 'slice_attribute_ids'): self.DeleteSliceAttributes()
374 if hasattr(self, 'slice_ids'): self.DeleteSlices()
375 if hasattr(self, 'slice_instantiation_ids'): self.DeleteSliceInstantiations()
376 if hasattr(self, 'slice_attribute_type_ids'): self.DeleteSliceAttributeTypes()
377 if hasattr(self, 'slice_attribute_ids'): self.DeleteSliceAttributes()
378 if hasattr(self, 'role_ids'): self.DeleteRoles()
379 if hasattr(self, 'pcu_type_ids'): self.DeletePCUTypes()
380 if hasattr(self, 'pcu_ids'): self.DeletePCUs()
381 if hasattr(self, 'pcu_protocol_type_ids'): self.DeleteProtocolTypes()
382 if hasattr(self, 'node_network_setting_ids'): self.DeleteNodeNetworkSettings()
383 if hasattr(self, 'address_ids'): self.DeleteAddresses()
384 if hasattr(self, 'address_type_ids'): self.DeleteAddressTypes()
385 if hasattr(self, 'node_ids'): self.DeleteNodes()
386 if hasattr(self, 'site_ids'): self.DeleteSites()
389 def Sites(self, n=4):
393 site_fields = random_site()
394 AddSite = self.debug(api.AddSite)
395 site_id = AddSite(auth, site_fields)
396 if site_id is None: continue
398 # Should return a unique id
399 self.isunique(site_id, site_ids, 'AddSite - isunique')
400 site_ids.append(site_id)
401 GetSites = self.debug(api.GetSites)
402 sites = GetSites(auth, [site_id])
403 if sites is None: continue
405 self.isequal(site, site_fields, 'AddSite - isequal')
408 site_fields = random_site()
409 UpdateSite = self.debug(api.UpdateSite)
410 result = UpdateSite(auth, site_id, site_fields)
413 sites = GetSites(auth, [site_id])
414 if sites is None: continue
416 self.isequal(site, site_fields, 'UpdateSite - isequal')
418 sites = GetSites(site_ids)
419 if sites is not None:
420 self.islistequal(site_ids, [site['site_id'] for site in sites], 'GetSites - isequal')
422 if self.config.verbose:
423 utils.header("Added sites: %s" % site_ids)
428 def DeleteSites(self):
430 DeleteSite = self.debug(api.DeleteSite)
431 for site_id in self.site_ids:
432 result = DeleteSite(site_id)
434 # Check if sites are deleted
435 GetSites = self.debug(api.GetSites)
436 sites = GetSites(auth, self.site_ids)
437 self.islistequal(sites, [], 'DeleteSite - check')
439 if self.config.verbose:
440 utils.header("Deleted sites: %s" % self.site_ids)
444 def Nodes(self, n=4):
448 node_fields = random_node()
449 site_id = random.sample(self.site_ids, 1)[0]
450 AddNode = self.debug(api.AddNode)
451 node_id = AddNode(auth, site_id, node_fields)
452 if node_id is None: continue
454 # Should return a unique id
455 self.isunique(node_id, node_ids, 'AddNode - isunique')
456 node_ids.append(node_id)
459 GetNodes = self.debug(api.GetNodes)
460 nodes = GetNodes(auth, [node_id])
461 if nodes is None: continue
463 self.isequal(node, node_fields, 'AddNode - isequal')
466 node_fields = random_node()
467 UpdateNode = self.debug(api.UpdateNode)
468 result = UpdateNode(auth, node_id, node_fields)
471 nodes = GetNodes(auth, [node_id])
472 if nodes is None: continue
474 self.isequal(node, node_fields, 'UpdateNode - isequal')
476 nodes = GetNodes(auth, node_ids)
477 if nodes is not None:
478 self.islistequal(node_ids, [node['node_id'] for node in nodes], 'GetNodes - isequal')
480 if self.config.verbose:
481 utils.header("Added nodes: %s" % node_ids)
485 def DeleteNodes(self):
486 DeleteNode = self.debug(api.DeleteNode)
487 for node_id in self.node_ids:
488 result = DeleteNode(auth, node_id)
490 # Check if nodes are deleted
491 GetNodes = self.debug(api.GetNodes)
492 nodes = GetNodes(auth, self.node_ids)
493 self.islistequal(nodes, [], 'DeleteNode Check')
495 if self.config.verbose:
496 utils.header("Deleted nodes: %s" % self.node_ids)
500 def AddressTypes(self, n = 3):
501 address_type_ids = []
503 address_type_fields = random_address_type()
504 AddAddressType = self.debug(api.AddAddressType)
505 address_type_id = AddAddressType(auth, address_type_fields)
506 if address_type_id is None: continue
508 # Should return a unique address_type_id
509 self.isunique(address_type_id, address_type_ids, 'AddAddressType - isunique')
510 address_type_ids.append(address_type_id)
513 GetAddressTypes = self.debug(api.GetAddressTypes)
514 address_types = GetAddressTypes(auth, [address_type_id])
515 if address_types is None: continue
516 address_type = address_types[0]
517 self.isequal(address_type, address_type_fields, 'AddAddressType - isequal')
519 # Update address type
520 address_type_fields = random_address_type()
521 UpdateAddressType = self.debug(api.UpdateAddressType)
522 result = UpdateAddressType(auth, address_type_id, address_type_fields)
523 if result is None: continue
525 # Check address type again
526 address_types = GetAddressTypes(auth, [address_type_id])
527 if address_types is None: continue
528 address_type = address_types[0]
529 self.isequal(address_type, address_type_fields, 'UpdateAddressType - isequal')
531 # Check get all address types
532 address_types = GetAddressTypes(auth, address_type_ids)
533 if address_types is not None:
534 self.islistequal(address_type_ids, [address_type['address_type_id'] for address_type in address_types], 'GetAddressTypes - isequal')
536 if self.config.verbose:
537 utils.header("Added address types: %s " % address_type_ids)
539 return address_type_ids
541 def DeleteAddressTypes(self):
543 DeleteAddressType = self.debug(api.DeleteAddressType)
544 for address_type_id in self.address_type_ids:
545 DeleteAddressType(auth, address_type_id)
547 GetAddressTypes = self.debug(api.GetAddressTypes)
548 address_types = GetAddressTypes(auth, self.address_type_ids)
549 self.islistequal(address_types, [], 'DeleteAddressType - check')
551 if self.config.verbose:
552 utils.header("Deleted address types: " % self.address_type_ids)
554 self.address_type_ids = []
556 def Addresses(self, n = 3):
559 address_fields = random_address()
560 site_id = random.sample(self.site_ids, 1)[0]
561 AddSiteAddress = self.debug(api.AddSiteAddress)
562 address_id = AddSiteAddress(auth, site_id, address_fields)
563 if address_id is None: continue
565 # Should return a unique address_id
566 self.isunique(address_id, address_ids, 'AddSiteAddress - isunique')
567 address_ids.append(address_id)
570 GetAddresses = self.debug(api.GetAddresses)
571 addresses = GetAddresses(auth, [address_id])
572 if addresses is None: continue
573 address = addresses[0]
574 self.isequal(address, address_fields, 'AddSiteAddress - isequal')
577 address_fields = random_address()
578 UpdateAddress = self.debug(api.UpdateAddress)
579 result = UpdateAddress(auth, address_id, address_fields)
582 addresses = GetAddresses(auth, [address_id])
583 if addresses is None: continue
584 address = addresses[0]
585 self.isequal(address, address_fields, 'UpdateAddress - isequal')
587 addresses = GetAddresses(auth, address_ids)
588 if addresses is not None:
589 self.islistequal(address_ids, [ad['address_id'] for ad in addresses], 'GetAddresses - isequal')
591 if self.config.verbose:
592 utils.header("Added addresses: %s" % address_ids)
596 def DeleteAddresses(self):
598 DeleteAddress = self.debug(api.DeleteAddress)
599 # Delete site addresses
600 for address_id in self.address_ids:
601 result = DeleteAddress(auth, address_id)
604 GetAddresses = self.debug(api.GetAddresses)
605 addresses = GetAddresses(auth, self.address_ids)
606 self.islistequal(addresses, [], 'DeleteAddress - check')
607 if self.config.verbose:
608 utils.header("Deleted addresses: %s" % self.address_ids)
610 self.address_ids = []
612 def Slices(self, n = 3):
616 slice_fields = random_slice()
617 AddSlice = self.debug(api.AddSlice)
618 slice_id = AddSlice(auth, slice_fields)
619 if slice_id is None: continue
621 # Should return a unique id
622 self.isunique(slice_id, slice_ids, 'AddSlicel - isunique')
623 slice_ids.append(slice_id)
624 GetSlices = self.debug(api.GetSlices)
625 slices = GetSlices(auth, [slice_id])
626 if slices is None: continue
628 self.isequal(slice, slice_fields, 'AddSlice - isequal')
631 slice_fields = random_slice()
632 UpdateSlice = self.debug(api.UpdateSlice)
633 result = UpdateSlice(auth, slice_id, slice_fields)
636 slices = GetSlices(auth, [slice_id])
637 if slices is None: continue
639 self.isequal(slice, slice_fields, 'UpdateSlice - isequal')
645 slices = GetSlices(auth, slice_ids)
646 if slices is not None:
647 self.islistequal(slice_ids, [slice['slice_id'] for slice in slices], 'GetSlices - isequal')
649 if self.config.verbose:
650 utils.header("Added slices: %s" % slice_ids)
654 def DeleteSlices(self):
656 # XX manually delete attributes for first slice
657 GetSlices = self.debug(api.GetSlices)
658 slices = GetSlices(auth, self.slice_ids, ['slice_attribute_ids', 'node_ids'])
660 # Have DeleteSlice automatically delete attriubtes for the rest
661 DeleteSlice = self.debug(api.DeleteSlice)
662 for slice_id in self.slice_ids:
664 DeleteSlice(auth, slice_id)
666 # Check if slices are deleted
667 GetSlices = self.debug(api.GetSlices)
668 slices = GetSlices(auth, self.slice_ids)
669 self.islistequal(slices, [], 'DeleteSlice - check')
671 if self.config.verbose:
672 utils.header("Deleted slices: %s" % self.slice_ids)
676 def Persons(self, n = 3):
682 person_fields = random_person()
683 AddPerson = self.debug(api.AddPerson)
684 person_id = AddPerson(auth, person_fields)
685 if person_id is None: continue
687 # Should return a unique person_id
688 self.isunique(person_id, person_ids, 'AddPerson - isunique')
689 person_ids.append(person_id)
690 GetPersons = self.debug(api.GetPersons)
691 persons = GetPersons(auth, [person_id])
692 if persons is None: continue
694 self.isequal(person, person_fields, 'AddPerson - isequal')
697 person_fields = random_person()
698 person_fields['enabled'] = True
699 UpdatePerson = self.debug(api.UpdatePerson)
700 result = UpdatePerson(auth, person_id, person_fields)
703 AddRoleToPerson = self.debug(api.AddRoleToPerson)
704 role = random.sample(roles, 1)[0]
705 result = AddRoleToPerson(auth, role, person_id)
709 key_id = AddPersonKey = self.debug(api.AddPersonKey)
710 AddPersonKey(auth, person_id, key)
713 site_id = random.sample(self.site_ids, 1)[0]
714 AddPersonToSite = self.debug(api.AddPersonToSite)
715 AddPersonToSite(auth, person_id, site_id)
717 # Add person to slice
718 slice_id = random.sample(self.slice_ids, 1)[0]
719 AddPersonToSlice = self.debug(api.AddPersonToSlice)
720 AddPersonToSlice(auth, person_id, slice_id)
722 # check role, key, site, slice
723 persons = GetPersons(auth, [person_id], ['roles', 'key_ids', 'site_ids', 'slice_ids'])
724 if persons is None or not persons: continue
726 self.islistequal([role], person['roles'], 'AddRoleToPerson - check')
727 self.islistequal([key_id], person['key_ids'], 'AddPersonKey - check')
728 self.islistequal([site_id], person['site_ids'], 'AddPersonToSite - check')
729 self.islistequal([slice_id], person['slice_ids'], 'AddPersonToSlice - check')
731 persons = GetPersons(auth, person_ids)
732 if persons is not None:
733 self.islistequal(person_ids, [p['person_id'] for p in persons], 'GetPersons - isequal')
735 if self.config.verbose:
736 utils.header("Added users: %s" % person_ids)
740 def DeletePersons(self):
742 # Delete attributes manually for first person
743 GetPersons = self.debug(api.GetPersons)
744 persons = GetPersons(auth, self.person_ids, ['person_id' , 'key_ids', 'site_ids', 'slice_ids', 'roles'])
745 if persons is None or not persons: return 0
750 role = random.sample(person['roles'], 1)[0]
751 DeleteRoleFromPerson = self.debug(api.DeleteRoleFromPerson)
752 DeleteRoleFromPerson(auth, role, person['person_id'])
754 if person['key_ids']:
756 key_id = random.sample(person['key_ids'], 1)[0]
757 DeleteKey = self.debug(api.DeleteKey)
758 DeleteKey(auth, key_id)
760 if person['site_ids']:
761 # Remove person from site
762 site_id = random.sample(person['site_ids'], 1)[0]
763 DeletePersonFromSite = self.debug(api.DeletePersonFromSite)
764 DeletePersonFromSite(auth, person['person_id'], site_id)
766 if person['slice_ids']:
767 # Remove person from slice
768 slice_id = random.sample(person['slice_ids'], 1)[0]
769 DeletePersonFromSlice = self.debug(api.DeletePersonFromSlice)
770 DeletePersonFromSlice(auth, person['person_id'], slice_id)
772 # check role, key, site, slice
773 persons = GetPersons(auth, [person['person_id']], ['roles', 'key_ids', 'site_ids', 'slice_ids'])
774 if persons is None or not persons: return 0
776 self.islistequal([], person['roles'], 'DeleteRoleFromPerson - check')
777 self.islistequal([], person['key_ids'], 'DeleteKey - check')
778 self.islistequal([], person['site_ids'], 'DeletePersonFromSite - check')
779 self.islistequal([], person['slice_ids'], 'DeletePersonFromSlice - check')
781 DeletePerson = self.debug(api.DeletePerson)
782 # Have DeletePeson automatically delete attriubtes for all other persons
783 for person_id in self.person_ids:
785 DeletePerson(auth, person_id)
787 # Check if persons are deleted
788 GetPersons = self.debug(api.GetPersons)
789 persons = GetPersons(auth, self.person_ids)
790 self.islistequal(persons, [], 'DeletePerson - check')
792 if self.config.verbose:
793 utils.header("Deleted users: %s" % self.person_ids)
798 def Keys(self, n = 3):
801 # Add a key to an account
802 key_fields = random_key()
803 person_id = random.sample(self.person_ids, 1)[0]
804 AddPersonKey = self.debug(api.AddPersonKey)
805 key_id = AddPersonKey(auth, person_id, key_fields)
806 if key_id is None: continue
808 # Should return a unique key_id
809 self.isunique(key_id, key_ids, 'AddPersonKey - isunique')
810 key_ids.append(key_id)
811 GetKeys = self.debug(api.GetKeys)
812 keys = GetKeys(auth, [key_id])
813 if keys is None: continue
815 self.isequal(key, key_fields, 'AddPersonKey - isequal')
818 key_fields = random_key()
819 UpdateKey = self.debug(api.UpdateKey)
820 result = UpdateKey(auth, key_id, key_fields)
822 keys = GetKeys(auth, [key_id])
823 if keys is None or not keys: continue
825 self.isequal(key, key_fields, 'UpdatePersonKey - isequal')
827 keys = GetKeys(auth, key_ids)
829 self.islistequal(key_ids, [key['key_id'] for key in keys], 'GetKeys - isequal')
831 if self.config.verbose:
832 utils.header("Added keys: %s" % key_ids)
836 def DeleteKeys(self):
838 # Blacklist first key, Delete rest
839 GetKeys = self.debug(api.GetKeys)
840 keys = GetKeys(auth, self.key_ids)
841 if keys is None or not keys: return 0
844 BlacklistKey = self.debug(api.BlacklistKey)
845 BlacklistKey(auth, key['key_id'])
847 keys = GetKeys(auth, [key['key_id']])
848 self.islistequal(keys, [], 'BlacklistKey - check')
850 if self.config.verbose:
851 utils.header("Blacklisted key: %s" % key['key_id'])
853 DeleteKey = self.debug(api.DeleteKey)
854 for key_id in self.key_ids:
855 DeleteKey(auth, key_id)
857 keys = GetKeys(auth, self.key_ids)
858 self.islistequal(keys, [], 'DeleteKey - check')
860 if self.config.verbose:
861 utils.header("Deleted keys: %s" % self.key_ids)
865 if __name__ == '__main__':
866 args = tuple(sys.argv[1:])
867 api_unit_test()(*args)