1 #!/usr/bin/env /usr/share/plc_api/plcsh
5 # Copyright (C) 2006 The Trustees of Princeton University
9 from pprint import pprint
10 from string import letters, digits, punctuation
12 from traceback import print_exc
22 from qa.Config import Config
23 from qa.logger import Logfile, log
24 from random import Random
32 try: boot_states = api.GetBootStates(auth)
33 except: boot_states = [u'boot', u'dbg', u'inst', u'new', u'rcnf', u'rins']
35 try: roles = [role['name'] for role in api.GetRoles(auth)]
36 except: roles = [u'admin', u'pi', u'user', u'tech']
38 try: methods = api.GetNetworkMethods(auth)
39 except: methods = [u'static', u'dhcp', u'proxy', u'tap', u'ipmi', u'unknown']
41 try: key_types = api.GetKeyTypes(auth)
42 except: key_types = [u'ssh']
44 try:types = api.GetNetworkTypes(auth)
45 except: types = [u'ipv4']
47 try:attribute_types = api.GetSliceAttributeTypes(auth)
48 except: attribute_types = range(20)
51 sites = api.GetSites(auth, None, ['login_base'])
52 login_bases = [site['login_base'] for site in sites]
57 def randfloat(min = 0.0, max = 1.0):
58 return float(min) + (random.random() * (float(max) - float(min)))
60 def randint(min = 0, max = 1):
61 return int(randfloat(min, max + 1))
64 return random.sample([True, False], 1)[0]
65 # See "2.2 Characters" in the XML specification:
67 # #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD]
69 # [#x7F-#x84], [#x86-#x9F], [#xFDD0-#xFDDF]
71 ascii_xml_chars = map(unichr, [0x9, 0xA, 0xD])
72 ascii_xml_chars += map(unichr, xrange(0x20, 0x7F - 1))
73 low_xml_chars = list(ascii_xml_chars)
74 low_xml_chars += map(unichr, xrange(0x84 + 1, 0x86 - 1))
75 low_xml_chars += map(unichr, xrange(0x9F + 1, 0xFF))
76 valid_xml_chars = list(low_xml_chars)
77 valid_xml_chars += map(unichr, xrange(0xFF + 1, 0xD7FF))
78 valid_xml_chars += map(unichr, xrange(0xE000, 0xFDD0 - 1))
79 valid_xml_chars += map(unichr, xrange(0xFDDF + 1, 0xFFFD))
81 def randstr(length, pool = valid_xml_chars, encoding = "utf-8"):
82 sample = random.sample(pool, min(length, len(pool)))
85 bytes = len(s.encode(encoding))
89 sample += random.sample(pool, min(length - bytes, len(pool)))
90 random.shuffle(sample)
96 # 1. Each part begins and ends with a letter or number.
97 # 2. Each part except the last can contain letters, numbers, or hyphens.
98 # 3. Each part is between 1 and 64 characters, including the trailing dot.
99 # 4. At least two parts.
100 # 5. Last part can only contain between 2 and 6 letters.
101 hostname = 'a' + randstr(61, letters + digits + '-') + '1.' + \
102 'b' + randstr(61, letters + digits + '-') + '2.' + \
103 'c' + randstr(5, letters)
106 def randpath(length):
108 for i in range(randint(1, 10)):
109 parts.append(randstr(randint(1, 30), ascii_xml_chars))
110 return os.sep.join(parts)[0:length]
113 return (randstr(100, letters + digits) + "@" + randhostname()).lower()
115 def randkey(bits = 2048):
116 key_types = ["ssh-dss", "ssh-rsa"]
117 key_type = random.sample(key_types, 1)[0]
118 return ' '.join([key_type,
119 base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))),
124 'name': randstr(254),
125 'abbreviated_name': randstr(50),
126 'login_base': randstr(20, letters).lower(),
127 'latitude': int(randfloat(-90.0, 90.0) * 1000) / 1000.0,
128 'longitude': int(randfloat(-180.0, 180.0) * 1000) / 1000.0,
135 'peername': randstr(254),
136 'peer_url': "https://" + randhostname() + "/",
137 'key': base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))),
138 'cacert': base64.b64encode(''.join(randstr(bits / 8).encode("utf-8")))
140 def random_address_type():
143 'description': randstr(254),
146 def random_address():
148 'line1': randstr(254),
149 'line2': randstr(254),
150 'line3': randstr(254),
151 'city': randstr(254),
152 'state': randstr(254),
153 'postalcode': randstr(64),
154 'country': randstr(128),
159 'first_name': randstr(128),
160 'last_name': randstr(128),
161 'email': randemail(),
163 # Accounts are disabled by default
165 'password': randstr(254),
170 'key_type': random.sample(key_types, 1)[0],
176 'name': random.sample(login_bases, 1)[0] + "_" + randstr(11, letters).lower(),
177 'url': "http://" + randhostname() + "/",
178 'description': randstr(2048),
181 def random_nodegroup():
184 'description': randstr(200),
189 'hostname': randhostname(),
190 'boot_state': random.sample(boot_states, 1)[0],
191 'model': randstr(255),
192 'version': randstr(64),
195 def random_nodenetwork():
196 nodenetwork_fields = {
197 'method': random.sample(methods, 1)[0],
198 'type': random.sample(types, 1)[0],
199 'bwlimit': randint(500000, 10000000),
203 ip = randint(0, 0xffffffff)
204 netmask = (0xffffffff << randint(2, 31)) & 0xffffffff
205 network = ip & netmask
206 broadcast = ((ip & netmask) | ~netmask) & 0xffffffff
207 gateway = randint(network + 1, broadcast - 1)
208 dns1 = randint(0, 0xffffffff)
210 for field in 'ip', 'netmask', 'network', 'broadcast', 'gateway', 'dns1':
211 nodenetwork_fields[field] = socket.inet_ntoa(struct.pack('>L', locals()[field]))
213 return nodenetwork_fields
218 'hostname': randhostname(),
219 'ip': socket.inet_ntoa(struct.pack('>L', randint(0, 0xffffffff))),
220 'protocol': randstr(16),
221 'username': randstr(254),
222 'password': randstr(254),
223 'notes': randstr(254),
224 'model': randstr(32),
227 def random_conf_file():
229 'enabled': bool(randint()),
230 'source': randpath(255),
231 'dest': randpath(255),
232 'file_permissions': "%#o" % randint(0, 512),
233 'file_owner': randstr(32, letters + '_' + digits),
234 'file_group': randstr(32, letters + '_' + digits),
235 'preinstall_cmd': randpath(100),
236 'postinstall_cmd': randpath(100),
237 'error_cmd': randpath(100),
238 'ignore_cmd_errors': bool(randint()),
239 'always_update': bool(randint()),
242 def random_attribute_type():
244 'name': randstr(100),
245 'description': randstr(254),
246 'min_role_id': random.sample(roles, 1)[0],
249 def random_pcu_type():
251 'model': randstr(254),
252 'name': randstr(254),
255 def random_pcu_protocol_type():
257 'port': randint(0, 65535),
258 'protocol': randstr(254),
259 'supported': randbool()
262 def random_slice_instantiation():
264 'instantiation': randstr(10)
266 def random_slice_attribute():
268 'attribute_type_id': random.sample(attribute_types, 1)[0],
270 'description': randstr(100),
271 'min_role_id': random.sample([10,20,30,40], 1)[0],
274 def isequal(object_fields, expected_fields):
276 for field in expected_fields:
277 assert field in object_fields
278 assert object_fields[field] == expected_fields[field]
283 def islistequal(list1, list2):
285 assert set(list1) == set(list2)
290 def isunique(id, id_list):
292 assert id not in id_list
297 class api_unit_test(Test):
299 logfile = Logfile("api-unittest-summary.log")
308 pcu_protocol_types = 2,
316 nodenetworksetting_types = 2,
317 nodenetworksettings = 2,
318 slice_attribute_types = 2,
319 slice_instantiations = 2,
321 slice_attributes = 4,
329 # Filter out deprecated (Adm) and boot Methods
330 current_methods = lambda method: not method.startswith('Adm') or \
331 not method.startswith('Slice') or \
332 not method.startswith('Boot') or \
333 not method.startswith('system')
334 self.all_methods = set(api.system.listMethods())
335 self.methods_tested = set()
336 self.methods_failed = set()
338 # Begin testing methods
341 if hasattr(self, 'BootStates'): self.boot_states = self.BootStates(boot_states)
342 if hasattr(self, 'Sites'): self.site_ids = self.Sites(sites)
343 if hasattr(self, 'Peers'): self.peer_ids = self.Peers(peers)
344 if hasattr(self, 'AddressTypes'): self.address_type_ids = self.AddressTypes(address_types)
345 if hasattr(self, 'Addresses'): self.address_ids = self.Addresses(addresses)
346 if hasattr(self, 'PCUTypes'): self.pcu_type_ids = self.PCUTypes(pcu_types)
347 if hasattr(self, 'PCUProtocolTypes'): self.pcu_protocol_type_ids = self.PCUProtocolTypes(pcu_protocol_types)
348 if hasattr(self, 'PCUs'): self.pcu_ids = self.PCUs(pcus)
349 if hasattr(self, 'NetworkMethods'): self.network_methods = self.NetworkMethods()
350 if hasattr(self, 'NetworkTypes'): self.network_types = self.NetworkTypes()
351 if hasattr(self, 'NodeGroups'): self.nodegroup_ids = self.NodeGroups()
352 if hasattr(self, 'Nodes'): self.node_ids = self.Nodes(nodes)
353 if hasattr(self, 'ConfFiles'): self.conf_file_ids = self.ConfFiles(conf_files)
354 if hasattr(self, 'NodeNetworks'): self.node_network_ids = self.NodeNetworks(node_networks)
355 if hasattr(self, 'NodeNetworkSettingsTypes'): self.node_network_setting_type_ids = self.NodeNetworkSettingsTypes(node_network_settings_types)
356 if hasattr(self, 'NodeNetworkSettings'): self.node_network_setting_ids = self.NodeNetworkSettings(node_network_settings)
357 if hasattr(self, 'SliceAttributeTypes'): self.slice_attribute_types = self.SliceAttributeTypes(slice_attribute_types)
358 if hasattr(self, 'SliceInstantiations'): self.slice_instantiations = self.SliceInstantiations(slice_instantiations)
359 if hasattr(self, 'Slices'): self.slice_ids = self.Slices(slices)
360 if hasattr(self, 'SliceAttributes'): self.slice_attribute_ids = self.SliceAttributes(slice_attributes)
361 if hasattr(self, 'InitScripts'): self.initscript_ids = self.InitScripts(initscripts)
362 if hasattr(self, 'Roles'): self.role_ids = self.Roles(roles)
363 if hasattr(self, 'Persons'): self.person_ids = self.Persons(persons)
364 if hasattr(self, 'KeyTypes'): self.key_types = self.KeyTypes(key_types)
365 if hasattr(self, 'Keys'): self.key_ids = self.Keys(keys)
366 if hasattr(self, 'Messages'): self.message_ids = self.Messages(messages)
367 if hasattr(self, 'NotifyPersons'): self.NotifPersons()
368 # Test GetEventObject only
369 if hasattr(self, 'GetEventObject'): self.event_object_ids = self.GetEventObjects()
370 if hasattr(self, 'GetEvents'): self.event_ids = self.GetEvents()
377 utils.header("writing api-unittest-summary.log")
378 methods_ok = list(self.methods_tested.difference(self.methods_failed))
379 methods_failed = list(self.methods_failed)
380 methods_untested = list(self.all_methods.difference(self.methods_tested))
382 methods_failed.sort()
383 methods_untested.sort()
384 print >> self.logfile, "\n".join([m+": [OK]" for m in methods_ok])
385 print >> self.logfile, "\n".join([m+": [FAILED]" for m in methods_failed])
386 print >> self.logfile, "\n".join([m+": [Not Tested]" for m in methods_untested])
388 def isequal(self, object_fields, expected_fields, method_name):
390 for field in expected_fields:
391 assert field in object_fields
392 assert object_fields[field] == expected_fields[field]
394 self.methods_failed.update([method_name])
398 def islistequal(self, list1, list2, method_name):
399 try: assert set(list1) == set(list2)
401 self.methods_failed.update([method_name])
405 def isunique(self, id, id_list, method_name):
406 try: assert id not in id_list
408 self.methods_failed.update([method_name])
412 def debug(self, method, method_name=None):
413 if method_name is None:
414 method_name = method.name
416 self.methods_tested.update([method_name])
417 def wrapper(*args, **kwds):
419 return method(*args, **kwds)
421 self.methods_failed.update([method_name])
422 print >> self.logfile, "%s: %s\n" % (method_name, traceback.format_exc())
428 if hasattr(self, 'key_type_ids'): self.DeleteKeyTypes()
429 if hasattr(self, 'key_ids'): self.DeleteKeys()
430 if hasattr(self, 'person_ids'): self.DeletePersons()
431 if hasattr(self, 'role_ids'): self.DeleteRoles()
432 if hasattr(self, 'initscript_ids'): self.DeleteInitScripts()
433 if hasattr(self, 'slice_attribute_ids'): self.DeleteSliceAttributes()
434 if hasattr(self, 'slice_ids'): self.DeleteSlices()
435 if hasattr(self, 'slice_instantiations'): self.DeleteSliceInstantiations()
436 if hasattr(self, 'slice_attribute_type_ids'): self.DeleteSliceAttributeTypes()
437 if hasattr(self, 'nodenetwork_setting_ids'): self.DeleteNodeNetworkSettings()
438 if hasattr(self, 'nodenetwork_setting_type_ids'): self.DeleteNodeNetworkSettingTypes()
439 if hasattr(self, 'nodenetwork_ids'): self.DeleteNodeNetworks()
440 if hasattr(self, 'conffile_ids'): self.DeleteConfFiles()
441 if hasattr(self, 'node_ids'): self.DeleteNodes()
442 if hasattr(self, 'nodegroups_ids'): self.DeleteNodeGroups()
443 if hasattr(self, 'network_types'): self.DeleteNetworkTypes()
444 if hasattr(self, 'network_methods'): self.DeleteNetworkMethods()
445 if hasattr(self, 'pcu_ids'): self.DeletePCUs()
446 if hasattr(self, 'pcu_protocol_type_ids'): self.DeletePCUProtocolTypes()
447 if hasattr(self, 'pcu_type_ids'): self.DeletePCUTypes()
448 if hasattr(self, 'address_ids'): self.DeleteAddresses()
449 if hasattr(self, 'address_type_ids'): self.DeleteAddressTypes()
450 if hasattr(self, 'peer_ids'): self.DeletePeers()
451 if hasattr(self, 'site_ids'): self.DeleteSites()
452 if hasattr(self, 'boot_state_ids'): self.DeleteBootStates()
455 def Sites(self, n=4):
459 site_fields = random_site()
460 AddSite = self.debug(api.AddSite)
461 site_id = AddSite(auth, site_fields)
462 if site_id is None: continue
464 # Should return a unique id
465 self.isunique(site_id, site_ids, 'AddSite - isunique')
466 site_ids.append(site_id)
467 GetSites = self.debug(api.GetSites)
468 sites = GetSites(auth, [site_id])
469 if sites is None: continue
471 self.isequal(site, site_fields, 'AddSite - isequal')
474 site_fields = random_site()
475 UpdateSite = self.debug(api.UpdateSite)
476 result = UpdateSite(auth, site_id, site_fields)
479 sites = GetSites(auth, [site_id])
480 if sites is None: continue
482 self.isequal(site, site_fields, 'UpdateSite - isequal')
484 sites = GetSites(auth, site_ids)
485 if sites is not None:
486 self.islistequal(site_ids, [site['site_id'] for site in sites], 'GetSites - isequal')
488 if self.config.verbose:
489 utils.header("Added sites: %s" % site_ids)
494 def DeleteSites(self):
496 DeleteSite = self.debug(api.DeleteSite)
497 for site_id in self.site_ids:
498 result = DeleteSite(auth, site_id)
500 # Check if sites are deleted
501 GetSites = self.debug(api.GetSites)
502 sites = GetSites(auth, self.site_ids)
503 self.islistequal(sites, [], 'DeleteSite - check')
505 if self.config.verbose:
506 utils.header("Deleted sites: %s" % self.site_ids)
510 def NetworkMethods(self, n=2):
512 AddNetworkMethod = self.debug(api.AddNetworkMethod)
513 GetNetworkMethods = self.debug(api.GetNetworkMethods)
517 net_method = randstr(10)
518 AddNetworkMethod(auth, net_method)
519 if net_method is None: continue
521 # Should return a unique id
522 self.isunique(net_method, methods, 'AddNetworkMethod - isunique')
523 methods.append(net_method)
524 net_methods = GetNetworkMethods(auth)
525 if net_methods is None: continue
526 net_methods = filter(lambda x: x in [net_method], net_methods)
527 method = net_methods[0]
528 self.isequal(method, net_method, 'AddNetworkMethod - isequal')
531 net_methods = GetNetworkMethods(auth)
532 if net_methods is not None:
533 net_methods = filter(lambda x: x in methods, net_methods)
534 self.islistequal(methods, net_methods, 'GetNetworkMethods - isequal')
536 if self.config.verbose:
537 utils.header("Added network methods: %s" % methods)
541 def DeleteNetworkMethods(self):
542 DeleteNetworkMethod = self.debug(api.DeleteNetworkMethod)
543 GetNetworkMethods = self.debug(api.GetNetworkMethods)
544 for method in self.network_methods:
545 DeleteNetworkMethod(auth, method)
548 network_methods = GetNetworkMethods(auth, self.network_methods)
549 self.islistequal(network_methods, [], 'DeleteNetworkMethods - check')
551 if self.config.verbose:
552 utils.header("Deleted network methods: %s" % self.network_methods)
553 self.network_methods = []
555 def NetworkTypes(self, n=2):
557 AddNetworkType = self.debug(api.AddNetworkType)
558 GetNetworkTypes = self.debug(api.GetNetworkTypes)
563 AddNetworkType(auth, type)
565 # Should return a unique id
566 self.isunique(type, net_types, 'AddNetworkType - isunique')
567 net_types.append(type)
568 types = GetNetworkTypes(auth)
569 if types is None: continue
570 types = filter(lambda x: x in [type], types)
571 if types is None: continue
573 self.isequal(net_type, type, 'AddNetworkType - isequal')
575 types = GetNetworkTypes(auth, net_types)
576 if types is not None:
577 types = filter(lambda x: x in net_types, types)
578 self.islistequal(types, net_types, 'GetNetworkTypes - isequal')
580 if self.config.verbose:
581 utils.header("Added network types: %s" % net_types)
585 def DeleteNetworkTypes(self):
586 DeleteNetworkType = self.debug(api.DeleteNetworkType)
587 GetNetworkTypes = self.debug(api.GetNetworkTypes)
588 for type in self.network_types:
589 DeleteNetworkType(auth, type)
592 network_types = GetNetworkTypes(auth, self.network_types)
593 self.islistequal(network_types, [], 'DeleteNetworkTypes - check')
595 if self.config.verbose:
596 utils.header("Deleted network types: %s" % self.network_types)
597 self.network_types = []
600 def NodeGroups(self, n = 4):
602 AddNodeGroup = self.debug(api.AddNodeGroup)
603 UpdateNodeGroup = self.debug(api.UpdateNodeGroup)
604 GetNodeGroups = self.debug(api.GetNodeGroups)
608 nodegroup_fields = random_nodegroup()
609 nodegroup_id = AddNodeGroup(auth, nodegroup_fields)
610 if nodegroup_id is None: continue
612 # Should return a unique id
613 self.isunique(nodegroup_id, nodegroup_ids, 'AddNodeGroup - isunique')
614 nodegroup_ids.append(nodegroup_id)
615 nodegroups = GetNodeGroups(auth, [nodegroup_id])
616 if nodegroups is None: continue
617 nodegroup = nodegroups[0]
618 self.isequal(nodegroup, nodegroup_fields, 'AddNodeGroup - isequal')
621 nodegroup_fields = random_nodegroup()
622 UpdateNodeGroup(auth, nodegroup_id, nodegroup_fields)
625 nodegroups = GetNodeGroups(auth, [nodegroup_id])
626 if nodegroups is None: continue
627 nodegroup = nodegroups[0]
628 self.isequal(nodegroup, nodegroup_fields, 'UpdateNodeGroup - isequal')
630 nodegroups = GetNodeGroups(auth, nodegroup_ids)
631 if nodegroups is not None:
632 self.islistequal(nodegroup_ids, [n['nodegroup_id'] for n in nodegroups], 'GetNodeGroups - isequal')
633 if self.config.verbose:
634 utils.header("Added nodegroups: %s" % nodegroup_ids)
638 def DeleteNodeGroups(self):
639 # Delete all NodeGroups
640 GetNodeGroups = self.debug(api.GetNodeGroups)
641 DeleteNodeGroup = self.debug(api.DeleteNodeGroup)
643 for nodegroup_id in self.nodegroup_ids:
644 result = DeleteNodeGroup(auth, nodegroup_id)
646 # Check is nodegroups are deleted
647 nodegroups = GetNodeGroups(auth, self.nodegroup_ids)
648 self.islisteqeual(nodegroups, [], 'DeleteNodeGroup - check')
650 if self.config.verbose:
651 utils.header("Deleted nodegroups: %s" % self.nodegroup_ids)
653 self.nodegroup_ids = []
655 def PCUTypes(self, n=2):
657 AddPCUType = self.debug(api.AddPCUType)
658 UpdatePCUType = self.debug(api.UpdatePCUType)
659 GetPCUTypes = self.debug(api.GetPCUTypes)
663 pcu_type_fields = random_pcu_type()
664 pcu_type_id = AddPCUType(auth, pcu_type_fields)
665 if pcu_type_id is None: continue
667 # Should return a unique id
668 self.isunique(pcu_type_id, pcu_type_ids, 'AddPCUType - isunique')
669 pcu_type_ids.append(pcu_type_id)
672 pcu_types = GetPCUTypes(auth, [pcu_type_id])
673 if pcu_types is None: continue
674 pcu_type = pcu_types[0]
675 self.isequal(pcu_type, pcu_type_fields, 'AddPCUType - isequal')
678 pcu_type_fields = random_pcu_type()
679 pcu_type_id = UpdatePCUType(auth, pcu_type_id, pcu_type_fields)
682 pcu_types = GetPCUTypes(auth, [pcu_type_id])
683 if pcu_types is None: continue
684 pcu_type = pcu_types[0]
685 self.isequal(pcu_type, pcu_type_fields, 'UpdatePCUType - isequal')
687 pcu_types = GetPCUTypes(auth, pcu_type_ids)
688 if pcu_types is not None:
689 self.islistequal(pcu_type_ids, [p['pcu_type_id'] for p in pcu_types], 'GetPCUTypes - check')
691 if self.config.verbose:
692 utils.header("Added pcu_types: %s " % pcu_type_ids)
695 def DeletePCUTypes(self):
696 GetPCUTypes = self.debug(api.GetPCUTypes)
697 DeletePCUType = self.debug(api.DeletePCUType)
699 for pcu_type_id in self.pcu_type_ids:
700 DeletePCUType(auth, pcu_type_id)
702 pcu_types = GetPCUTypes(auth, self.pcu_type_ids)
703 self.islistequal(pcu_types, [], 'DeletePCUType - check')
705 def PCUProtocolTypes(self, n=2):
706 protocol_type_ids = []
707 AddPCUProtocolType = self.debug(api.AddPCUProtocolType)
708 UpdatePCUProtocolType = self.debug(api.UpdatePCUProtocolType)
709 GetPCUProtocolTypes = self.debug(api.GetPCUProtocolTypes)
712 # Add PCUProtocolType
713 protocol_type_fields = random_pcu_protocol_type()
714 pcu_type_id = random.sample(self.pcu_type_ids, 1)[0]
715 protocol_type_id = AddPCUProtocolType(auth, pcu_type_id, protocol_type_fields)
716 if protocol_type_id is None: continue
718 # Should return a unique id
719 self.isunique(protocol_type_id, protocol_type_ids, 'AddPCUProtocolType - isunique')
720 protocol_type_ids.append(protocol_type_id)
722 # Check protocol type
723 protocol_types = GetPCUProtocolTypes(auth, [protocol_type_id])
724 if protocol_types is None: continue
725 protocol_type = protocol_types[0]
726 self.isequal(protocol_type, protocol_type_fields, 'AddPCUProtocolType - isequal')
728 # Update protocol type
729 protocol_type_fields = random_pcu_protocol_type()
730 UpdatePCUProtocolType(auth, protocol_type_id, protocol_type_fields)
733 protocol_types = GetPCUProtocolTypes(auth, [protocol_type_id])
734 if protocol_types is None: continue
735 protocol_type = protocol_types[0]
736 self.isequal(protocol_type, protocol_type_fields, 'UpdatePCUProtocolType - isequal')
738 protocol_types = GetPCUProtocolTypes(auth, protocol_type_ids)
739 if protocol_types is not None:
740 pt_ids = [p['pcu_protocol_type_id'] for p in protocol_types]
741 self.islistequal(protocol_type_ids, pt_ids, 'GetPCUProtocolTypes - isequal')
743 if self.config.verbose:
744 utils.header('Added pcu_protocol_types: %s' % protocol_type_ids)
746 return protocol_type_ids
748 def DeletePCUProtocolTypes(self):
749 GetPCUProtocolTypes = self.debug(api.GetPCUProtocolTypes)
750 DeletePCUProtocolType = self.debug(api.DeletePCUProtocolType)
752 for protocol_type_id in self.pcu_protocol_type_ids:
753 DeletePCUProtocolType(auth, protocol_type_id)
756 protocol_types = GetPCUProtocolTypes(auth, self.pcu_protocol_type_ids)
757 self.islistequal(protocol_types, [], 'DeletePCUProtocolType - check')
759 if self.config.verbose:
760 utils.header("Deleted pcu_protocol_types: %s" % self.pcu_protocol_type_ids)
761 self.pcu_protocol_type_ids = []
763 def PCUs(self, n = 4):
765 AddPCU = self.debug(api.AddPCU)
766 UpdatePCU = self.debug(api.UpdatePCU)
767 GetPCUs = self.debug(api.GetPCUs)
771 pcu_fields = random_pcu()
772 site_id = random.sample(self.site_ids, 1)[0]
773 pcu_id = AddPCU(auth, site_id, pcu_fields)
774 if pcu_id is None: continue
776 # Should return a unique id
777 self.isunique(pcu_id, pcu_ids, 'AddPCU - isunique')
778 pcu_ids.append(pcu_id)
781 pcus = GetPCUs(auth, [pcu_id])
782 if pcus is None: continue
784 self.isequal(pcu, pcu_fields, 'AddPCU - isequal')
787 pcu_fields = random_pcu()
788 UpdatePCU(auth, pcu_id, pcu_fields)
791 pcus = GetPCUs(auth, [pcu_id])
792 if pcus is None: continue
794 self.isequal(pcu, pcu_fields, 'UpdatePCU - isequal')
796 pcus = GetPCUs(auth, pcu_ids)
798 self.islistequal(pcu_ids, [p['pcu_id'] for p in pcus], 'GetPCUs - isequal')
800 if self.config.verbose:
801 utils.header('Added pcus: %s' % pcu_ids)
805 def DeletePCUs(self):
806 GetPCUs = self.debug(api.GetPCUs)
807 DeletePCU = self.debug(api.DeletePCU)
809 for pcu_id in self.pcu_ids:
810 DeletePCU(auth, pcu_id)
813 pcus = GetPCUs(auth, self.pcu_ids)
814 self.islistequal(pcus, [], 'DeletePCU - check')
816 if self.config.verbose:
817 utils.header("Deleted pcus: %s " % self.pcu_ids)
820 def Nodes(self, n=4):
824 node_fields = random_node()
825 site_id = random.sample(self.site_ids, 1)[0]
826 AddNode = self.debug(api.AddNode)
827 node_id = AddNode(auth, site_id, node_fields)
828 if node_id is None: continue
830 # Should return a unique id
831 self.isunique(node_id, node_ids, 'AddNode - isunique')
832 node_ids.append(node_id)
835 GetNodes = self.debug(api.GetNodes)
836 nodes = GetNodes(auth, [node_id])
837 if nodes is None: continue
839 self.isequal(node, node_fields, 'AddNode - isequal')
842 node_fields = random_node()
843 UpdateNode = self.debug(api.UpdateNode)
844 result = UpdateNode(auth, node_id, node_fields)
847 nodes = GetNodes(auth, [node_id])
848 if nodes is None: continue
850 self.isequal(node, node_fields, 'UpdateNode - isequal')
852 # Add node to nodegroup
853 nodegroup_id = random.sample(self.nodegroup_ids, 1)[0]
854 AddNodeToNodeGroup = self.debug(api.AddNodeToNodeGroup)
855 AddNodeToNodeGroup(auth, node_id, nodegroup_id)
858 pcu_id = random.sample(self.pcu_ids, 1)[0]
859 AddNodeToPCU = self.debug(api.AddNodeToPCU)
860 AddNodeToPCU(auth, node_id, nodegroup_id)
862 # check nodegroup, pcu
863 nodes = GetNodes(auth, [node_id], ['nodegroup_ids', 'pcu_ids'])
864 if nodes is None or not nodes: continue
866 self.islistequal([nodegroup_id], node['nodegroup_ids'], 'AddNodeToNodeGroup - check')
867 self.islistequal([pcu_id], node['pcu_ids'], 'AddNodeToPCU - check')
869 nodes = GetNodes(auth, node_ids)
870 if nodes is not None:
871 self.islistequal(node_ids, [node['node_id'] for node in nodes], 'GetNodes - isequal')
873 if self.config.verbose:
874 utils.header("Added nodes: %s" % node_ids)
878 def DeleteNodes(self):
880 # Delete attributes manually for first node
881 GetNodes = self.debug(api.GetNodes)
882 nodes = GetNodes(auth, self.node_ids)
883 if nodes is None or not nodes: return 0
886 if node['nodegroup_ids']:
888 nodegroup_id = random.sample(node['nodegroup_ids'], 1)[0]
889 DeleteNodeFromNodeGroup = self.debug(api.DeleteNodeFromNodeGroup)
890 DeleteNodeFromNodeGroup(auth, node['node_id'], nodegroup_id)
894 pcu_id = random.sample(node['pcu_ids'], 1)[0]
895 DeleteNodeFromPCU = self.debug(api.DeleteNodeFromPCU)
896 DeleteNodeFromPCU(auth, node['node_id'], pcu_id)
898 # check nodegroup, pcu
899 nodes = GetNodes(auth, [node['node_id']])
900 if nodes is None or not nodes: return 0
901 self.islistequal([], node['nodegroup_ids'], 'DeleteNodeGromNodeGroup - check')
902 self.islistequal([], node['pcu_ids'], 'DeleteNodeFromPCU - check')
904 # Delete rest of nodes
905 DeleteNode = self.debug(api.DeleteNode)
906 for node_id in self.node_ids:
907 result = DeleteNode(auth, node_id)
909 # Check if nodes are deleted
910 GetNodes = self.debug(api.GetNodes)
911 nodes = GetNodes(auth, self.node_ids)
912 self.islistequal(nodes, [], 'DeleteNode Check')
914 if self.config.verbose:
915 utils.header("Deleted nodes: %s" % self.node_ids)
919 def AddressTypes(self, n = 3):
920 address_type_ids = []
922 address_type_fields = random_address_type()
923 AddAddressType = self.debug(api.AddAddressType)
924 address_type_id = AddAddressType(auth, address_type_fields)
925 if address_type_id is None: continue
927 # Should return a unique address_type_id
928 self.isunique(address_type_id, address_type_ids, 'AddAddressType - isunique')
929 address_type_ids.append(address_type_id)
932 GetAddressTypes = self.debug(api.GetAddressTypes)
933 address_types = GetAddressTypes(auth, [address_type_id])
934 if address_types is None: continue
935 address_type = address_types[0]
936 self.isequal(address_type, address_type_fields, 'AddAddressType - isequal')
938 # Update address type
939 address_type_fields = random_address_type()
940 UpdateAddressType = self.debug(api.UpdateAddressType)
941 result = UpdateAddressType(auth, address_type_id, address_type_fields)
942 if result is None: continue
944 # Check address type again
945 address_types = GetAddressTypes(auth, [address_type_id])
946 if address_types is None: continue
947 address_type = address_types[0]
948 self.isequal(address_type, address_type_fields, 'UpdateAddressType - isequal')
950 # Check get all address types
951 address_types = GetAddressTypes(auth, address_type_ids)
952 if address_types is not None:
953 self.islistequal(address_type_ids, [address_type['address_type_id'] for address_type in address_types], 'GetAddressTypes - isequal')
955 if self.config.verbose:
956 utils.header("Added address types: %s " % address_type_ids)
958 return address_type_ids
960 def DeleteAddressTypes(self):
962 DeleteAddressType = self.debug(api.DeleteAddressType)
963 for address_type_id in self.address_type_ids:
964 DeleteAddressType(auth, address_type_id)
966 GetAddressTypes = self.debug(api.GetAddressTypes)
967 address_types = GetAddressTypes(auth, self.address_type_ids)
968 self.islistequal(address_types, [], 'DeleteAddressType - check')
970 if self.config.verbose:
971 utils.header("Deleted address types: " % self.address_type_ids)
973 self.address_type_ids = []
975 def Addresses(self, n = 3):
978 address_fields = random_address()
979 site_id = random.sample(self.site_ids, 1)[0]
980 AddSiteAddress = self.debug(api.AddSiteAddress)
981 address_id = AddSiteAddress(auth, site_id, address_fields)
982 if address_id is None: continue
984 # Should return a unique address_id
985 self.isunique(address_id, address_ids, 'AddSiteAddress - isunique')
986 address_ids.append(address_id)
989 GetAddresses = self.debug(api.GetAddresses)
990 addresses = GetAddresses(auth, [address_id])
991 if addresses is None: continue
992 address = addresses[0]
993 self.isequal(address, address_fields, 'AddSiteAddress - isequal')
996 address_fields = random_address()
997 UpdateAddress = self.debug(api.UpdateAddress)
998 result = UpdateAddress(auth, address_id, address_fields)
1001 addresses = GetAddresses(auth, [address_id])
1002 if addresses is None: continue
1003 address = addresses[0]
1004 self.isequal(address, address_fields, 'UpdateAddress - isequal')
1006 addresses = GetAddresses(auth, address_ids)
1007 if addresses is not None:
1008 self.islistequal(address_ids, [ad['address_id'] for ad in addresses], 'GetAddresses - isequal')
1010 if self.config.verbose:
1011 utils.header("Added addresses: %s" % address_ids)
1015 def DeleteAddresses(self):
1017 DeleteAddress = self.debug(api.DeleteAddress)
1018 # Delete site addresses
1019 for address_id in self.address_ids:
1020 result = DeleteAddress(auth, address_id)
1023 GetAddresses = self.debug(api.GetAddresses)
1024 addresses = GetAddresses(auth, self.address_ids)
1025 self.islistequal(addresses, [], 'DeleteAddress - check')
1026 if self.config.verbose:
1027 utils.header("Deleted addresses: %s" % self.address_ids)
1029 self.address_ids = []
1031 def SliceAttributeTypes(self, n = 2):
1032 attribute_type_ids = []
1033 AddSliceAttributeType = self.debug(api.AddSliceAttribute)
1034 GetSliceAttributeTypes = self.debug(api.GetSliceAttributes)
1035 UpdateSliceAttributeType = self.debug(api.UpdateSliceAttribute)
1038 attribute_type_fields = random_attribute_type()
1039 attribute_type_id = AddSliceAttributeType(auth, attribute_type_fields)
1040 if attribute_type_id is None: continue
1042 # Should return a unique slice_attribute_type_id
1043 self.isunique(attribute_type_id, attribute_type_ids, 'AddSliceAttributeType - isunique')
1044 attribute_type_ids.append(attribute_type_id)
1046 # Check slice_attribute_type
1047 attribute_types = GeddtSliceAttribute_types(auth, [attribute_type_id])
1048 if attribute_types is None: continue
1049 attribute_type = attribute_types[0]
1050 self.isequal(attribute_type, attribute_types_fields, 'AddSliceAttributeType - isequal')
1052 # Update slice_attribute_type
1053 attribute_type_fields = random_attribute_type()
1054 result = UpdateSliceAttributeType(auth, attribute_type_id, attribute_type_fields)
1057 attribute_types = GetSliceAttributeTypes(auth, [attribute_type_id])
1058 if attribute_types is None: continue
1059 attribute_type = attribute_types[0]
1060 self.isequal(attribute_type, attribute_type_fields, 'UpdateSliceAttributeType - isequal')
1062 attribute_types = GetSliceAttributeTypes(auth, attribute_type_ids)
1063 if attribute_types is not None:
1064 at_ids = [at['attribute_type_id'] for at in attribute_types]
1065 self.islistequal(attribute_type_ids, at_ids, 'GetSliceAttributeTypes - isequal')
1067 if self.config.verbose:
1068 utils.header("Added slice_attribute_types: %s" % attribute_type_ids)
1070 return attribute_type_ids
1072 def DeleteSliceAttributeTypes(self):
1073 DeleteSliceAttributeType = self.debug(api.DeleteSliceAttributeType)
1074 GetSliceAttributeTypes = self.debug(api.GetSliceAtttributeTypes)
1076 # Delete slice_attribute_type
1077 for slice_attribute_type_id in self.slice_attribute_types_ids:
1078 result = DeleteSliceAttributeType(auth, slice_attribute_type_id)
1081 slice_attribute_type_ids = GetSliceAttributeTypes(auth, self.slice_attribute_type_id)
1082 self.islistequal(slice_attribute_type_ids, [], 'DeleteSliceAttributeTypes - check')
1083 if self.config.verbose:
1084 utils.header("Deleted slice_attributes: %s" % self.slice_attribute_type_ids)
1086 self.slice_attribute_type_ids = []
1088 def SliceInstantiations(self, n = 2):
1090 AddSliceInstantiation= self.debug(api.AddSliceInstantiation)
1091 GetSliceInstantiations = self.debug(api.GetSliceInstantiations)
1095 result = AddSliceInstantiation(auth, inst)
1096 if result is None: continue
1099 # Check slice instantiaton
1100 instantiations = GetSliceInstantiations(auth)
1101 if instantiations is None: continue
1102 instantiations = filter(lambda x: x in [inst], instantiations)
1103 instantiation = instantiations[0]
1104 self.isequal(instantiation, inst, 'AddSliceInstantiation - isequal')
1107 instantiations = GetSliceInstantiations(auth, insts)
1108 if instantiations is not None:
1109 self.islistequal(insts, instantiations, 'GetSliceInstantiations - isequal')
1111 if self.config.verbose:
1112 utils.header("Added slice instantiations: %s" % insts)
1116 def DeleteSliceInstantiations(self):
1117 DeleteSliceInstantiation = self.debug(api.DeleteSliceInstantiation)
1118 GetSliceInstantiations = self.debug(api.GetSliceInstantiations)
1119 # Delete slice instantiation
1120 for instantiation in self.slice_instantiations:
1121 result = DeleteSliceInstantiation(auth, instantiation)
1124 instantiations = GetSliceInstantiations(auth)
1125 instantiations = filter(lambda x: x in self.slice_instantiations, instantiations)
1126 self.islistequal(instantiations, [], 'DeleteSliceInstantiation - check')
1127 if self.config.verbose:
1128 utils.header("Deleted slice instantiations" % self.slice_instantiations)
1130 self.slice_instantiations = []
1132 def Slices(self, n = 3):
1134 AddSlice = self.debug(api.AddSlice)
1135 GetSlices = self.debug(api.GetSlices)
1136 UpdateSlice = self.debug(api.UpdateSlice)
1137 AddSliceToNode = self.debug(api.AddSliceToNodes)
1140 slice_fields = random_slice()
1141 slice_id = AddSlice(auth, slice_fields)
1142 if slice_id is None: continue
1144 # Should return a unique id
1145 self.isunique(slice_id, slice_ids, 'AddSlicel - isunique')
1146 slice_ids.append(slice_id)
1147 slices = GetSlices(auth, [slice_id])
1148 if slices is None: continue
1150 self.isequal(slice, slice_fields, 'AddSlice - isequal')
1153 slice_fields = random_slice()
1154 result = UpdateSlice(auth, slice_id, slice_fields)
1157 slices = GetSlices(auth, [slice_id])
1158 if slices is None: continue
1160 self.isequal(slice, slice_fields, 'UpdateSlice - isequal')
1163 node_id = random.sample(self.node_ids, 1)[0]
1164 AddSliceToNode(auth, slice_id, node_id)
1167 slices = GetSlices(auth, [slice_id], ['node_ids'])
1168 if slices is None or not slices: continue
1170 self.islistequal([node_id], slice['node_ids'], 'AddSliceToNode - check')
1172 slices = GetSlices(auth, slice_ids)
1173 if slices is not None:
1174 self.islistequal(slice_ids, [slice['slice_id'] for slice in slices], 'GetSlices - isequal')
1176 if self.config.verbose:
1177 utils.header("Added slices: %s" % slice_ids)
1181 def DeleteSlices(self):
1183 GetSlices = self.debug(api.GetSlices)
1184 DeleteSlice = self.debug(api.DeleteSlice)
1185 DeleteSliceFromNodes = self.debug(api.DeleteSliceFromNodes)
1187 # manually delete attributes for first slice
1188 slices = GetSlices(auth, self.slice_ids, ['slice_id', 'node_ids'])
1189 if slices is None or not slices: return 0
1192 if slice['node_ids']:
1193 # Delete node from slice
1194 node_id = random.sample(slice['node_ids'], 1)[0]
1195 DeleteSliceFromNodes(slice['slice_id'], [node_id])
1198 slices = GetSlices(auth, [slice['slice_id']], ['node_ids'])
1199 if slices is None or not slices: return 0
1201 self.islistequal([], slice['node_ids'], 'DeleteSliceFromNode - check')
1203 # Have DeleteSlice automatically delete attriubtes for the rest
1204 for slice_id in self.slice_ids:
1206 DeleteSlice(auth, slice_id)
1208 # Check if slices are deleted
1209 GetSlices = self.debug(api.GetSlices)
1210 slices = GetSlices(auth, self.slice_ids)
1211 self.islistequal(slices, [], 'DeleteSlice - check')
1213 if self.config.verbose:
1214 utils.header("Deleted slices: %s" % self.slice_ids)
1218 def SliceAttributes(self, n = 4):
1220 AddSliceAttribute = self.debug(api.AddSliceAttribute)
1221 GetSliceAttributes = self.debug(api.GetSliceAttributes)
1222 UpdateSliceAttribute = self.debug(api.UpdateSliceAttribute)
1225 # Add slice attribute
1226 attribute_fields = random_slice_attribute()
1227 slice_id = random.sample(self.slice_ids, 1)[0]
1228 attribute_id = AddSliceAttribute(auth, slice_id, attribute_fields)
1229 if attribute_id is None: continue
1231 # Should return a unique id
1232 self.isunique(attribute_id, attribute_ids, 'AddSliceAttribute - isunique')
1233 attribute_ids.append(attribute_id)
1236 attributes = GetSliceAttributes(auth, [attribute_id])
1237 if attributes is None: continue
1238 attribute = attributes[0]
1239 self.isequal(attribute, attribute_fields, 'AddSliceAttribute - isequal')
1242 attribute_fields = random_attribute()
1243 result = UpdateSliceAttribute(auth, attribute_id, attribute_fields)
1246 attributes = GetSliceAttributes(auth, [attribute_id])
1247 if attributes is None: continue
1248 attribute = attributes[0]
1249 self.isequal(attribute, attribute_fields, 'UpdateSliceAttribute - isequal')
1251 attributes = GetSliceAttributes(auth, attribute_ids)
1252 if attributes is not None:
1253 attr_ids = [a['attribute_id'] for a in attributes]
1254 self.islistequal(attribute_ids, attr_ids, 'GetSliceAttributes - isequal')
1255 if self.config.verbose:
1256 utils.header("Added slice attributes: %s" % attribute_ids)
1258 return attribute_ids
1260 def DeleteSliceAttributes(self):
1261 DeleteSliceAttribute = self.debug(api.DeleteSliceAttribute)
1262 GetSliceAttributes = self.debug(api.GetSliceAttributes)
1264 for attribute_id in self.slice_attribute_ids:
1265 DeleteSliceAttribute(auth, attribute_id)
1267 attributes = GetSliceAttributes(auth, self.slice_attribute_ids)
1268 self.islistequal(attributes, [], 'DeleteSliceAttribute - check')
1270 if self.config.verbose:
1271 utils.header("Deleted slice attributes: %s" % self.slice_attribute_ids)
1273 self.slice_attribute_ids = []
1275 def Persons(self, n = 3):
1281 person_fields = random_person()
1282 AddPerson = self.debug(api.AddPerson)
1283 person_id = AddPerson(auth, person_fields)
1284 if person_id is None: continue
1286 # Should return a unique person_id
1287 self.isunique(person_id, person_ids, 'AddPerson - isunique')
1288 person_ids.append(person_id)
1289 GetPersons = self.debug(api.GetPersons)
1290 persons = GetPersons(auth, [person_id])
1291 if persons is None: continue
1293 self.isequal(person, person_fields, 'AddPerson - isequal')
1296 person_fields = random_person()
1297 person_fields['enabled'] = True
1298 UpdatePerson = self.debug(api.UpdatePerson)
1299 result = UpdatePerson(auth, person_id, person_fields)
1302 AddRoleToPerson = self.debug(api.AddRoleToPerson)
1303 role = random.sample(roles, 1)[0]
1304 result = AddRoleToPerson(auth, role, person_id)
1308 key_id = AddPersonKey = self.debug(api.AddPersonKey)
1309 AddPersonKey(auth, person_id, key)
1311 # Add person to site
1312 site_id = random.sample(self.site_ids, 1)[0]
1313 AddPersonToSite = self.debug(api.AddPersonToSite)
1314 AddPersonToSite(auth, person_id, site_id)
1316 # Add person to slice
1317 slice_id = random.sample(self.slice_ids, 1)[0]
1318 AddPersonToSlice = self.debug(api.AddPersonToSlice)
1319 AddPersonToSlice(auth, person_id, slice_id)
1321 # check role, key, site, slice
1322 persons = GetPersons(auth, [person_id], ['roles', 'key_ids', 'site_ids', 'slice_ids'])
1323 if persons is None or not persons: continue
1325 self.islistequal([role], person['roles'], 'AddRoleToPerson - check')
1326 self.islistequal([key_id], person['key_ids'], 'AddPersonKey - check')
1327 self.islistequal([site_id], person['site_ids'], 'AddPersonToSite - check')
1328 self.islistequal([slice_id], person['slice_ids'], 'AddPersonToSlice - check')
1330 persons = GetPersons(auth, person_ids)
1331 if persons is not None:
1332 self.islistequal(person_ids, [p['person_id'] for p in persons], 'GetPersons - isequal')
1334 if self.config.verbose:
1335 utils.header("Added users: %s" % person_ids)
1339 def DeletePersons(self):
1341 # Delete attributes manually for first person
1342 GetPersons = self.debug(api.GetPersons)
1343 persons = GetPersons(auth, self.person_ids, ['person_id' , 'key_ids', 'site_ids', 'slice_ids', 'roles'])
1344 if persons is None or not persons: return 0
1349 role = random.sample(person['roles'], 1)[0]
1350 DeleteRoleFromPerson = self.debug(api.DeleteRoleFromPerson)
1351 DeleteRoleFromPerson(auth, role, person['person_id'])
1353 if person['key_ids']:
1355 key_id = random.sample(person['key_ids'], 1)[0]
1356 DeleteKey = self.debug(api.DeleteKey)
1357 DeleteKey(auth, key_id)
1359 if person['site_ids']:
1360 # Remove person from site
1361 site_id = random.sample(person['site_ids'], 1)[0]
1362 DeletePersonFromSite = self.debug(api.DeletePersonFromSite)
1363 DeletePersonFromSite(auth, person['person_id'], site_id)
1365 if person['slice_ids']:
1366 # Remove person from slice
1367 slice_id = random.sample(person['slice_ids'], 1)[0]
1368 DeletePersonFromSlice = self.debug(api.DeletePersonFromSlice)
1369 DeletePersonFromSlice(auth, person['person_id'], slice_id)
1371 # check role, key, site, slice
1372 persons = GetPersons(auth, [person['person_id']], ['roles', 'key_ids', 'site_ids', 'slice_ids'])
1373 if persons is None or not persons: return 0
1375 self.islistequal([], person['roles'], 'DeleteRoleFromPerson - check')
1376 self.islistequal([], person['key_ids'], 'DeleteKey - check')
1377 self.islistequal([], person['site_ids'], 'DeletePersonFromSite - check')
1378 self.islistequal([], person['slice_ids'], 'DeletePersonFromSlice - check')
1380 DeletePerson = self.debug(api.DeletePerson)
1381 # Have DeletePeson automatically delete attriubtes for all other persons
1382 for person_id in self.person_ids:
1384 DeletePerson(auth, person_id)
1386 # Check if persons are deleted
1387 GetPersons = self.debug(api.GetPersons)
1388 persons = GetPersons(auth, self.person_ids)
1389 self.islistequal(persons, [], 'DeletePerson - check')
1391 if self.config.verbose:
1392 utils.header("Deleted users: %s" % self.person_ids)
1394 self.person_ids = []
1397 def Keys(self, n = 3):
1400 # Add a key to an account
1401 key_fields = random_key()
1402 person_id = random.sample(self.person_ids, 1)[0]
1403 AddPersonKey = self.debug(api.AddPersonKey)
1404 key_id = AddPersonKey(auth, person_id, key_fields)
1405 if key_id is None: continue
1407 # Should return a unique key_id
1408 self.isunique(key_id, key_ids, 'AddPersonKey - isunique')
1409 key_ids.append(key_id)
1410 GetKeys = self.debug(api.GetKeys)
1411 keys = GetKeys(auth, [key_id])
1412 if keys is None: continue
1414 self.isequal(key, key_fields, 'AddPersonKey - isequal')
1417 key_fields = random_key()
1418 UpdateKey = self.debug(api.UpdateKey)
1419 result = UpdateKey(auth, key_id, key_fields)
1421 keys = GetKeys(auth, [key_id])
1422 if keys is None or not keys: continue
1424 self.isequal(key, key_fields, 'UpdatePersonKey - isequal')
1426 keys = GetKeys(auth, key_ids)
1427 if keys is not None:
1428 self.islistequal(key_ids, [key['key_id'] for key in keys], 'GetKeys - isequal')
1430 if self.config.verbose:
1431 utils.header("Added keys: %s" % key_ids)
1435 def DeleteKeys(self):
1437 # Blacklist first key, Delete rest
1438 GetKeys = self.debug(api.GetKeys)
1439 keys = GetKeys(auth, self.key_ids)
1440 if keys is None or not keys: return 0
1443 BlacklistKey = self.debug(api.BlacklistKey)
1444 BlacklistKey(auth, key['key_id'])
1446 keys = GetKeys(auth, [key['key_id']])
1447 self.islistequal(keys, [], 'BlacklistKey - check')
1449 if self.config.verbose:
1450 utils.header("Blacklisted key: %s" % key['key_id'])
1452 DeleteKey = self.debug(api.DeleteKey)
1453 for key_id in self.key_ids:
1454 DeleteKey(auth, key_id)
1456 keys = GetKeys(auth, self.key_ids)
1457 self.islistequal(keys, [], 'DeleteKey - check')
1459 if self.config.verbose:
1460 utils.header("Deleted keys: %s" % self.key_ids)
1464 def BootStates(self, n = 3):
1466 AddBootState = self.debug(api.AddBootState)
1467 GetBootStates = self.debug(api.GetBootStates)
1470 bootstate_fields = randstr(10)
1471 result = AddBootState(auth, bootstate_fields)
1472 if result is None: continue
1475 boot_states.append(bootstate_fields)
1476 bootstates = GetBootStates(auth)
1477 if not bootstates: continue
1478 bootstates = filter(lambda x: x in [bootstate_fields], bootstates)
1479 if not bootstates: continue
1480 bootstate = bootstates[0]
1481 self.isequal(bootstate, bootstate_fields, 'AddBootState - isequal')
1484 bs = GetBootStates(auth)
1486 bs = filter(lambda x: x in [boot_states], bs)
1487 self.islistequal(boot_states, bs, 'GetBootStates - isequal')
1489 if self.config.verbose:
1490 utils.header("Added boot_states: %s" % boot_states)
1494 def DeleteBootStates(self):
1495 DeleteBootState = self.debug(api.DeleteBootState)
1496 GetBootStates = self.debug(api.GetBootStates)
1497 for boot_state in self.boot_states:
1498 result = DeleteBootState(auth, boot_state)
1500 # Check if bootsates are deleted
1501 boot_states = GetBootStates(auth, self.boot_states)
1502 self.islistequa(boot_states, [], 'DeleteBootState check')
1504 if self.config.verbose:
1505 utils.header("Deleted boot_states: %s" % self.boot_states)
1507 self.boot_states = []
1510 def Peers(self, n = 2):
1514 peer_fields = random_peer()
1515 AddPeer = self.debug(api.AddPeer)
1516 peer_id = AddPeer(auth, peer_fields)
1518 # Should return a unique id
1519 self.isunique(peer_id, peer_ids, 'AddPeer - isunique')
1520 peer_ids.append(peer_id)
1521 GetPeers = self.debug(api.GetPeers)
1522 peers = GetPeers(auth, [peer_id])
1523 if peers is None: continue
1525 self.isequal(peer, peer_fields, 'AddPeer - isequal')
1528 peer_fields = random_peer()
1529 UpdatePeer = self.debug(api.UpdatePeer)
1530 result = UpdatePeer(auth, peer_id, peer_fields)
1533 peers = GetPeers(auth, [peer_id])
1534 if peers is None: continue
1536 self.isequal(peer, peer_fields, 'UpdatePeer - isequal')
1538 peers = GetPeers(auth, peer_ids)
1539 if peers is not None:
1540 self.islistequal(peer_ids, [peer['peer_id'] for peer in peers], 'GetPeers -isequal')
1542 if self.config.verbose:
1543 utils.header("Added peers: %s" % peer_ids)
1548 def DeletePeers(self):
1550 DeletePeer = self.debug(api.DeletePeer)
1551 for peer_id in self.peer_ids:
1552 result = DeletePeer(auth, peer_id)
1554 # Check if peers are deleted
1555 GetPeers = self.debug(api.GetPeers)
1556 peers = GetPeers(auth, self.peer_ids)
1557 self.islistequal(peers, [], 'DeletePeer - check' % self.peer_ids)
1559 if self.config.verbose:
1560 utils.header("Deleted sites: %s" % self.peer_ids)
1563 def ConfFiles(self, n = 2):
1567 conf_file_fields = random_conf_file()
1568 AddConfFile = self.debug(api.AddConfFile)
1569 conf_file_id = AddConfFile(auth, conf_file_fields)
1570 if conf_file_id is None: continue
1572 # Should return a unique id
1573 self.isunique(conf_file_id, conf_file_ids, 'AddConfFile - isunique')
1574 conf_file_ids.append(conf_file_id)
1577 GetConfFiles = self.debug(api.GetConfFiles)
1578 conf_files = GetConfFiles(auth, [conf_file_id])
1579 if conf_files is None: continue
1580 conf_file = conf_files[0]
1581 self.isequal(conf_file, conf_file_fields, 'AddConfFile - isunique')
1584 conf_file_fields = random_conf_file()
1585 UpdateConfFile = self.debug(api.UpdateConfFile)
1586 result = UpdateConfFile(auth, conf_file_id, conf_file_fields)
1589 conf_files = GetConfFiles(auth, [conf_file_id])
1590 if conf_files is None: continue
1591 conf_file = conf_files[0]
1592 self.isequal(conf_file, conf_file_fields, 'UpdateConfFile - isunique')
1595 # Add this conf file to a random node
1596 node_id = random.sample(self.node_ids, 1)[0]
1597 AddConfFileToNode = self.debug(api.AddConfFileToNode)
1598 AddConfFileToNode(auth, conf_file_id, node_id)
1600 # Add this conf file to a random node group
1601 nodegroup_id = random.sample(self.nodegroup_ids, 1)[0]
1602 AddConfFileToNodeGroup = self.debug(api.AddConfFileToNodeGroup)
1603 AddConfFileToNodeGroup(auth, conf_file_id, nodegroup_id)
1605 # Check node, nodegroup
1606 conf_files = GetConfFiles(auth, [conf_file_id], ['node_ids', 'nodegroup_ids'])
1607 if conf_files is None or not conf_files: continue
1608 conf_file = conf_files[0]
1609 self.islistequal([node_id], conf_file['node_ids'], 'AddConfFileToNode - check')
1610 self.islistequal([nodegroup_id], conf_file['nodegroup_ids'], 'AddConfFileToNodeGroup - check')
1614 conf_files = GetConfFiles(auth, conf_file_ids)
1615 if conf_files is not None:
1616 self.islistequal(conf_file_ids, [c['conf_file_id'] for c in conf_files], 'GetConfFiles - isequal')
1617 if self.config.verbose:
1618 utils.header("Added conf_files: %s" % conf_file_ids)
1620 return conf_file_ids
1622 def DeleteConfFiles(self):
1624 GetConfFiles = self.debug(api.GetConfFiles)
1625 conf_files = GetConfFiles(auth, self.conf_file_ids)
1626 if conf_fiels is None or not conf_files: return 0
1627 conf_file = conf_files[0]
1629 if conf_file['node_ids']:
1630 node_id = random.sample(conf_file['node_ids'], 1)[0]
1631 DeleteConfFileFromNode = self.debug(api.DeleteConfFileFromNode)
1632 DeleteConfFileFromNode(auth, conf_file['conf_file_id'], node_id)
1634 if conf_file['nodegroup_ids']:
1635 nodegroup_id = random.sample(conf_file['nodegroup_ids'], 1)[0]
1636 DeleteConfFileFromNodeGroup = self.debug(api.DeleteConfFileFromNodeGroup)
1637 DeleteConfFileFromNode(auth, conf_file['conf_file_id'], nodegroup_id)
1640 conf_files = GetConfFiles(auth, conf_file['conf_file_id'], ['node_ids', 'nodegroup_ids'])
1641 if conf_files is None or not conf_files: return 0
1642 conf_file = conf_files[0]
1643 self.islistequal([], conf_file['node_ids'], 'AddConfFileToNode - check')
1644 self.islistequal([], conf_file['nodegroup_ids'], 'AddConfFileToNodeGroup - check')
1646 DeleteConfFile = self.debug(api.DeleteConfFile)
1647 for conf_file_id in self.conf_file_ids:
1648 DeleteConfFile(auth, conf_file_id)
1651 conf_files = GetConfFiles(auth, self.conf_file_ids)
1652 self.islistequal(conf_files, [], 'DeleteConfFile - check')
1654 if self.config.verbose:
1655 utils.header("Deleted conf_files: %s" % self.conf_file_ids)
1657 self.conf_file_ids = []
1660 if __name__ == '__main__':
1661 args = tuple(sys.argv[1:])
1662 api_unit_test()(*args)