5 # Mark Huang <mlhuang@cs.princeton.edu>
6 # Copyright (C) 2006 The Trustees of Princeton University
8 # $Id: Test.py,v 1.18 2007/01/09 16:22:49 mlhuang Exp $
11 from pprint import pprint
12 from string import letters, digits, punctuation
13 from traceback import print_exc
14 from optparse import OptionParser
20 from Config import Config
21 from logger import log
22 from random import Random
29 boot_states = api.GetBootStates(auth)
30 roles = [role['name'] for role in api.GetRoles(auth)]
31 methods = api.GetNetworkMethods(auth)
32 types = api.GetNetworkTypes(auth)
34 #ifrom PLC.Shell import Shell
35 #shell = Shell(globals())
37 def randfloat(min = 0.0, max = 1.0):
38 return float(min) + (random.random() * (float(max) - float(min)))
40 def randint(min = 0, max = 1):
41 return int(randfloat(min, max + 1))
43 # See "2.2 Characters" in the XML specification:
45 # #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD]
47 # [#x7F-#x84], [#x86-#x9F], [#xFDD0-#xFDDF]
49 ascii_xml_chars = map(unichr, [0x9, 0xA, 0xD])
50 ascii_xml_chars += map(unichr, xrange(0x20, 0x7F - 1))
51 low_xml_chars = list(ascii_xml_chars)
52 low_xml_chars += map(unichr, xrange(0x84 + 1, 0x86 - 1))
53 low_xml_chars += map(unichr, xrange(0x9F + 1, 0xFF))
54 valid_xml_chars = list(low_xml_chars)
55 valid_xml_chars += map(unichr, xrange(0xFF + 1, 0xD7FF))
56 valid_xml_chars += map(unichr, xrange(0xE000, 0xFDD0 - 1))
57 valid_xml_chars += map(unichr, xrange(0xFDDF + 1, 0xFFFD))
60 def randstr(length, pool = valid_xml_chars, encoding = "utf-8"):
61 sample = random.sample(pool, min(length, len(pool)))
64 bytes = len(s.encode(encoding))
68 sample += random.sample(pool, min(length - bytes, len(pool)))
69 random.shuffle(sample)
75 # 1. Each part begins and ends with a letter or number.
76 # 2. Each part except the last can contain letters, numbers, or hyphens.
77 # 3. Each part is between 1 and 64 characters, including the trailing dot.
78 # 4. At least two parts.
79 # 5. Last part can only contain between 2 and 6 letters.
80 hostname = 'a' + randstr(61, letters + digits + '-') + '1.' + \
81 'b' + randstr(61, letters + digits + '-') + '2.' + \
82 'c' + randstr(5, letters)
87 for i in range(randint(1, 10)):
88 parts.append(randstr(randint(1, 30), ascii_xml_chars))
89 return os.sep.join(parts)[0:length]
92 return (randstr(100, letters + digits) + "@" + randhostname()).lower()
94 def randkey(bits = 2048):
95 key_types = ["ssh-dss", "ssh-rsa"]
96 key_type = random.sample(key_types, 1)[0]
97 return ' '.join([key_type,
98 base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))),
103 'name': randstr(254),
104 'abbreviated_name': randstr(50),
105 'login_base': randstr(20, letters).lower(),
106 'latitude': int(randfloat(-90.0, 90.0) * 1000) / 1000.0,
107 'longitude': int(randfloat(-180.0, 180.0) * 1000) / 1000.0,
110 def random_address_type():
113 'description': randstr(254),
116 def random_address():
118 'line1': randstr(254),
119 'line2': randstr(254),
120 'line3': randstr(254),
121 'city': randstr(254),
122 'state': randstr(254),
123 'postalcode': randstr(64),
124 'country': randstr(128),
129 'first_name': randstr(128),
130 'last_name': randstr(128),
131 'email': randemail(),
133 # Accounts are disabled by default
135 'password': randstr(254),
140 'key_type': random.sample(key_types, 1)[0],
146 'name': site['login_base'] + "_" + randstr(11, letters).lower(),
147 'url': "http://" + randhostname() + "/",
148 'description': randstr(2048),
151 def random_nodegroup():
154 'description': randstr(200),
159 'hostname': randhostname(),
160 'boot_state': random.sample(boot_states, 1)[0],
161 'model': randstr(255),
162 'version': randstr(64),
165 def random_nodenetwork():
166 nodenetwork_fields = {
167 'method': random.sample(methods, 1)[0],
168 'type': random.sample(types, 1)[0],
169 'bwlimit': randint(500000, 10000000),
173 ip = randint(0, 0xffffffff)
174 netmask = (0xffffffff << randint(2, 31)) & 0xffffffff
175 network = ip & netmask
176 broadcast = ((ip & netmask) | ~netmask) & 0xffffffff
177 gateway = randint(network + 1, broadcast - 1)
178 dns1 = randint(0, 0xffffffff)
180 for field in 'ip', 'netmask', 'network', 'broadcast', 'gateway', 'dns1':
181 nodenetwork_fields[field] = socket.inet_ntoa(struct.pack('>L', locals()[field]))
183 return nodenetwork_fields
187 'hostname': randhostname(),
188 'ip': socket.inet_ntoa(struct.pack('>L', randint(0, 0xffffffff))),
189 'protocol': randstr(16),
190 'username': randstr(254),
191 'password': randstr(254),
192 'notes': randstr(254),
193 'model': randstr(32),
196 def random_conf_file():
198 'enabled': bool(randint()),
199 'source': randpath(255),
200 'dest': randpath(255),
201 'file_permissions': "%#o" % randint(0, 512),
202 'file_owner': randstr(32, letters + '_' + digits),
203 'file_group': randstr(32, letters + '_' + digits),
204 'preinstall_cmd': randpath(100),
205 'postinstall_cmd': randpath(100),
206 'error_cmd': randpath(100),
207 'ignore_cmd_errors': bool(randint()),
208 'always_update': bool(randint()),
211 def random_attribute_type():
213 'name': randstr(100),
214 'description': randstr(254),
215 'min_role_id': random.sample(roles.values(), 1)[0],
218 def isequal(object_fields, expected_fields):
219 for field in expected_fields:
220 assert field in object_fields
221 assert object_fields[field] == expected_fields[field]
223 def islistequal(list1, list2):
224 assert set(list1) == set(list2)
226 def isunique(id, id_list):
227 assert id not in id_list
229 def get_methods(self, prefix):
230 method_list = filter(lambda name: name.startswith(prefix), self.methods)
235 # test_type: (Method, arguments, method_result_holder, number_to_add)
236 {'add': (api.AddSite, random_site(), site_ids, 3),
237 'add_check': (isunique,
243 Template class for testing Add methods
245 def __init__(self, auth, add_method, update_method, get_method, delete_method, primary_key):
247 self.methods_tested = []
248 for method in [add_method, update_method, get_method, delete_method]:
249 self.methods_tested.append(method._Method_name)
252 self.Add = log(add_method)
253 self.Update = log(update_method)
254 self.Get = log(get_method)
255 self.Delete = log(delete_method)
256 self.primary_key = primary_key
259 def test(self, args_method, num):
265 object_fields = args_method()
266 object_id = self.Add(self.auth, object_fields)
268 # Should return a unique id
269 AddCheck = log(isunique, 'Unique Check')
270 AddCheck(object_id, object_ids)
271 self.object_ids.extend([object_id])
274 object = slef.Get(self.auth, [object_id])[0]
275 CheckObject = log(isequal, 'Add Check')
276 CheckObject(object, object_fields)
279 object_fields = args_method()
280 self.Update(self.auth, object_id, object_fields)
283 object = self.Get(self.auth, [object_id])[0]
284 CheckObject = log(isqeual, 'Update Check')
285 CheckSite(object, object_fields)
287 # Check Get all sites
288 objects = self.Get(self.auth, object_ids)
289 CheckObjects = log(islistqual, 'Get Check')
290 CheckObjects(object_ids, [object[self.primary_key] for object in objects])
292 return self.object_ids
296 for object_id in sefl.object_ids:
297 self.Delete(self.auth, object_id)
299 # Check if objects are deleted
300 CheckObjects = log(islistequal, 'Delete Check')
301 ChecObjects(api.Get(auth, self.object_ids), [])
306 def __init__(self, config, verbose = True):
308 self.verbose = verbose
309 self.methods = set(api.system.listMethods())
310 self.methods_tested = set()
313 self.address_type_ids = []
314 self.address_ids = []
326 site_test = Entity(auth, api.AddSite, api.UpdateSite, api.GetSite, api.DeleteSite, 'site_id') site_test.test(random_site, sites)
328 self.AddressTypes(address_types)
329 self.AddAddresses(addresses)
330 self.AddPersons(persons)
334 for method in set(self.methods).difference(self.methoods_tested):
335 print >> test_log, "%(method)s [Not Tested]" % locals()
339 self.DeleteAddresses()
340 self.DeleteAddressTypes()
345 def Sites(self, n = 1):
347 Add and Modify a random site.
352 site_fields = random_site()
353 AddSite = log(api.AddSite)
354 site_id = AddSite(auth, site_fields)
355 self.methods_tested.update(['AddSite'])
357 # Should return a unique site_id
358 CheckSite = log(isunique, 'Unique Check')
359 CheckSite(site_id, self.site_ids)
360 self.site_ids.append(site_id)
363 GetSites = log(api.GetSites)
364 site = GetSites(auth, [site_id])[0]
365 CheckSite = log(isequal, 'AddSite Check')
366 CheckSite(site, site_fields)
367 self.methods_tested.update(['GetSites'])
370 site_fields = random_site()
371 # XXX Currently cannot change login_base
372 del site_fields['login_base']
373 site_fields['max_slices'] = randint(1, 10)
374 UpdateSite = log(api.UpdateSite)
375 UpdateSite(auth, site_id, site_fields)
376 self.methods_tested.update(['UpdateSite'])
379 site = GetSites(auth, [site_id])[0]
380 CheckSite = log(isequal, 'UpdateSite Check')
381 CheckSite(site, site_fields)
384 # Check Get all sites
385 sites = GetSites(auth, self.site_ids)
386 CheckSite = log(islistequal, 'GetSites Check')
387 CheckSite(self.site_ids, [site['site_id'] for site in sites])
390 print "Added sites", self.site_ids
392 def DeleteSites(self):
394 Delete any random sites we may have added.
397 DeleteSite = log(api.DeleteSite)
398 for site_id in self.site_ids:
399 DeleteSite(auth, site_id)
400 self.methods_tested.update(['DeleteSite'])
402 # Check if sites are deleted
403 CheckSite = log(islistequal, 'DeleteSite Check')
404 CheckSite(api.GetSites(auth, self.site_ids), [])
407 print "Deleted sites", self.site_ids
411 def AddAddressTypes(self, n = 3):
413 Add a number of random address types.
417 address_type_fields = random_address_type()
418 AddAddressType = log(api.AddAddressType)
419 address_type_id = AddAddressType(auth, address_type_fields)
420 self.methods_tested.update(['AddAddressType'])
422 # Should return a unique address_type_id
423 CheckAddressType = log(isunique, 'Unique Check')
424 CheckAddressType(address_type_id, self.address_type_ids)
425 self.address_type_ids.append(address_type_id)
428 GetAddressTypes = log(api.GetAddressTypes)
429 address_type = GetAddressTypes(auth, [address_type_id])[0]
430 CheckAddressType = log(isequal, 'AddAddressType Check')
431 CheckAddressType(address_type, address_type_fields)
433 # Update address type
434 address_type_fields = random_address_type()
435 UpdateAddressType = log(api.UpdateAddressType)
436 UpdateAddressType(auth, address_type_id, address_type_fields)
438 # Check address type again
439 address_type = GetAddressTypes([address_type_id])[0]
440 CheckAddressType = log(isequal, 'UpdateAddressType Check')
441 CheckAddressType(address_type, address_type_fields)
442 self.methods_tested.update(['UpdateAddressType'])
444 # Check get all address types
445 address_types = GetAddressTypes(auth, self.address_type_ids)
446 CheckAddressType = log(islistequal, 'GetAddressType Check')
447 CheckAddressType(self.address_type_ids,
448 [address_type['address_type_id' for address_type in address_types])
449 self.methods_tested.update(['GetAddressTypes'])
452 print "Added address types", self.address_type_ids
454 def DeleteAddressTypes(self):
456 Delete any random address types we may have added.
459 DeleteAddressType = log(api.DeleteAddressType)
460 for address_type_id in self.address_type_ids:
461 DeleteAddressType(auth, address_type_id)
462 self.methods_tested.update(['DeleteAddressType'])
464 CheckAddressType = log(islistequal, 'DeleteAddressType Check')
465 CheckAddressType(api.GetAddressTypes(auth, self.address_type_ids), [])
468 print "Deleted address types", self.address_type_ids
470 self.address_type_ids = []
472 def AddAddresses(self, n = 3):
474 Add a number of random addresses to each site.
477 for site_id in self.site_ids:
479 address_fields = random_address()
480 address_id = AddSiteAddress(site_id, address_fields)
482 # Should return a unique address_id
483 assert address_id not in self.address_ids
484 self.address_ids.append(address_id)
488 address = GetAddresses([address_id])[0]
489 for field in address_fields:
490 assert address[field] == address_fields[field]
493 address_fields = random_address()
494 UpdateAddress(address_id, address_fields)
496 # Check address again
497 address = GetAddresses([address_id])[0]
498 for field in address_fields:
499 assert address[field] == address_fields[field]
502 for address_type_id in self.address_type_ids:
503 AddAddressTypeToAddress(address_type_id, address_id)
506 addresses = GetAddresses(self.address_ids)
507 assert set(self.address_ids) == set([address['address_id'] for address in addresses])
508 for address in addresses:
509 assert set(self.address_type_ids) == set(address['address_type_ids'])
512 print "Added addresses", self.address_ids
514 def DeleteAddresses(self):
516 Delete any random addresses we may have added.
519 # Delete site addresses
520 for address_id in self.address_ids:
521 # Remove address types
522 for address_type_id in self.address_type_ids:
523 DeleteAddressTypeFromAddress(address_type_id, address_id)
526 address = GetAddresses([address_id])[0]
527 assert not address['address_type_ids']
529 DeleteAddress(address_id)
531 assert not GetAddresses([address_id])
534 assert not GetAddresses(self.address_ids)
537 print "Deleted addresses", self.address_ids
539 self.address_ids = []
541 def AddPersons(self, n = 3):
543 Add a number of random users to each site.
547 role_ids = [role['role_id'] for role in roles]
548 roles = [role['name'] for role in roles]
549 roles = dict(zip(roles, role_ids))
554 person_fields = random_person()
555 person_id = AddPerson(person_fields)
557 # Should return a unique person_id
558 assert person_id not in self.person_ids
559 self.person_ids.append(person_id)
563 person = GetPersons([person_id])[0]
564 for field in person_fields:
565 if field != 'password':
566 assert person[field] == person_fields[field]
569 person_fields = random_person()
570 UpdatePerson(person_id, person_fields)
572 # Check account again
573 person = GetPersons([person_id])[0]
574 for field in person_fields:
575 if field != 'password':
576 assert person[field] == person_fields[field]
578 auth = {'AuthMethod': "password",
579 'Username': person_fields['email'],
580 'AuthString': person_fields['password']}
583 # Check that account is disabled
585 assert not AuthCheck(auth)
589 # Add random set of roles
590 person_roles = random.sample(['user', 'pi', 'tech'], randint(1, 3))
591 for person_role in person_roles:
592 role_id = roles[person_role]
593 AddRoleToPerson(role_id, person_id)
596 person = GetPersons([person_id])[0]
597 assert set(person_roles) == set(person['roles'])
600 UpdatePerson(person_id, {'enabled': True})
603 # Check that account is enabled
604 assert AuthCheck(auth)
606 # Associate account with random set of sites
608 for site_id in random.sample(self.site_ids, randint(1, len(self.site_ids))):
609 AddPersonToSite(person_id, site_id)
610 person_site_ids.append(site_id)
613 # Make sure it really did it
614 person = GetPersons([person_id])[0]
615 assert set(person_site_ids) == set(person['site_ids'])
618 primary_site_id = random.sample(person_site_ids, randint(1, len(person_site_ids)))[0]
619 SetPersonPrimarySite(person_id, primary_site_id)
622 person = GetPersons([person_id])[0]
623 assert person['site_ids'][0] == primary_site_id
626 print "Added users", self.person_ids
628 def DeletePersons(self):
630 for person_id in self.person_ids:
631 # Remove from each site
632 for site_id in self.site_ids:
633 DeletePersonFromSite(person_id, site_id)
636 person = GetPersons([person_id])[0]
637 assert not person['site_ids']
640 person = GetPersons([person_id])[0]
641 for role_id in person['role_ids']:
642 DeleteRoleFromPerson(role_id, person_id)
645 person = GetPersons([person_id])[0]
646 assert not person['role_ids']
649 UpdatePerson(person_id, {'enabled': False})
652 person = GetPersons([person_id])[0]
653 assert not person['enabled']
656 DeletePerson(person_id)
659 assert not GetPersons([person_id])
662 assert not GetPersons(self.person_ids)
665 print "Deleted users", self.person_ids
669 if __name__ == "__main__":
670 parser = OptionParser()
671 parser.add_option("-c", "--check", action = "store_true", default = False, help = "Verify actions (default: %default)")
672 parser.add_option("-q", "--quiet", action = "store_true", default = False, help = "Be quiet (default: %default)")
673 parser.add_option("-p", "--populate", action = "store_true", default = False, help = "Do not cleanup (default: %default)")
674 (options, args) = parser.parse_args()
675 test = Test(check = options.check, verbose = not options.quiet)
677 if not options.populate: