5 # Mark Huang <mlhuang@cs.princeton.edu>
6 # Copyright (C) 2006 The Trustees of Princeton University
8 # $Id: Test.py,v 1.15 2006/10/30 16:38:33 mlhuang Exp $
11 from pprint import pprint
12 from string import letters, digits, punctuation
13 from traceback import print_exc
14 from optparse import OptionParser
18 from random import Random
21 from PLC.Shell import Shell
22 shell = Shell(globals())
24 def randfloat(min = 0.0, max = 1.0):
25 return float(min) + (random.random() * (float(max) - float(min)))
27 def randint(min = 0, max = 1):
28 return int(randfloat(min, max + 1))
30 # See "2.2 Characters" in the XML specification:
32 # #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD]
34 # [#x7F-#x84], [#x86-#x9F], [#xFDD0-#xFDDF]
36 ascii_xml_chars = map(unichr, [0x9, 0xA, 0xD])
37 ascii_xml_chars += map(unichr, xrange(0x20, 0x7F - 1))
38 low_xml_chars = list(ascii_xml_chars)
39 low_xml_chars += map(unichr, xrange(0x84 + 1, 0x86 - 1))
40 low_xml_chars += map(unichr, xrange(0x9F + 1, 0xFF))
41 valid_xml_chars = list(low_xml_chars)
42 valid_xml_chars += map(unichr, xrange(0xFF + 1, 0xD7FF))
43 valid_xml_chars += map(unichr, xrange(0xE000, 0xFDD0 - 1))
44 valid_xml_chars += map(unichr, xrange(0xFDDF + 1, 0xFFFD))
46 def randstr(length, pool = valid_xml_chars, encoding = "utf-8"):
47 sample = random.sample(pool, min(length, len(pool)))
50 bytes = len(s.encode(encoding))
54 sample += random.sample(pool, min(length - bytes, len(pool)))
55 random.shuffle(sample)
61 # 1. Each part begins and ends with a letter or number.
62 # 2. Each part except the last can contain letters, numbers, or hyphens.
63 # 3. Each part is between 1 and 64 characters, including the trailing dot.
64 # 4. At least two parts.
65 # 5. Last part can only contain between 2 and 6 letters.
66 hostname = 'a' + randstr(61, letters + digits + '-') + '1.' + \
67 'b' + randstr(61, letters + digits + '-') + '2.' + \
68 'c' + randstr(5, letters)
73 for i in range(randint(1, 10)):
74 parts.append(randstr(randint(1, 30), ascii_xml_chars))
75 return os.sep.join(parts)[0:length]
78 return (randstr(100, letters + digits) + "@" + randhostname()).lower()
80 def randkey(bits = 2048):
81 key_types = ["ssh-dss", "ssh-rsa"]
82 key_type = random.sample(key_types, 1)[0]
83 return ' '.join([key_type,
84 base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))),
90 'abbreviated_name': randstr(50),
91 'login_base': randstr(20, letters).lower(),
92 'latitude': int(randfloat(-90.0, 90.0) * 1000) / 1000.0,
93 'longitude': int(randfloat(-180.0, 180.0) * 1000) / 1000.0,
96 def random_address_type():
99 'description': randstr(254),
102 def random_address():
104 'line1': randstr(254),
105 'line2': randstr(254),
106 'line3': randstr(254),
107 'city': randstr(254),
108 'state': randstr(254),
109 'postalcode': randstr(64),
110 'country': randstr(128),
115 'first_name': randstr(128),
116 'last_name': randstr(128),
117 'email': randemail(),
119 # Accounts are disabled by default
121 'password': randstr(254),
126 'key_type': random.sample(key_types, 1)[0],
132 'name': site['login_base'] + "_" + randstr(11, letters).lower(),
133 'url': "http://" + randhostname() + "/",
134 'description': randstr(2048),
138 def __init__(self, check = True, verbose = True):
140 self.verbose = verbose
143 self.address_type_ids = []
144 self.address_ids = []
156 self.AddAddressTypes(address_types)
157 self.AddAddresses(addresses)
158 self.AddPersons(persons)
166 self.DeleteAddresses()
167 self.DeleteAddressTypes()
170 def AddSites(self, n = 3):
172 Add a number of random sites.
177 site_fields = random_site()
178 site_id = AddSite(site_fields)
180 # Should return a unique site_id
181 assert site_id not in self.site_ids
182 self.site_ids.append(site_id)
186 site = GetSites([site_id])[0]
187 for field in site_fields:
188 assert site[field] == site_fields[field]
191 site_fields = random_site()
192 # XXX Currently cannot change login_base
193 del site_fields['login_base']
194 site_fields['max_slices'] = randint(1, 10)
195 UpdateSite(site_id, site_fields)
199 site = GetSites([site_id])[0]
200 for field in site_fields:
201 assert site[field] == site_fields[field]
204 sites = GetSites(self.site_ids)
205 assert set(self.site_ids) == set([site['site_id'] for site in sites])
208 print "Added sites", self.site_ids
210 def DeleteSites(self):
212 Delete any random sites we may have added.
215 for site_id in self.site_ids:
218 assert not GetSites([site_id])
221 assert not GetSites(self.site_ids)
224 print "Deleted sites", self.site_ids
228 def AddAddressTypes(self, n = 3):
230 Add a number of random address types.
234 address_type_fields = random_address_type()
235 address_type_id = AddAddressType(address_type_fields)
237 # Should return a unique address_type_id
238 assert address_type_id not in self.address_type_ids
239 self.address_type_ids.append(address_type_id)
243 address_type = GetAddressTypes([address_type_id])[0]
244 for field in 'name', 'description':
245 assert address_type[field] == address_type_fields[field]
247 # Update address type
248 address_type_fields = random_address_type()
249 UpdateAddressType(address_type_id, address_type_fields)
251 # Check address type again
252 address_type = GetAddressTypes([address_type_id])[0]
253 for field in 'name', 'description':
254 assert address_type[field] == address_type_fields[field]
257 address_types = GetAddressTypes(self.address_type_ids)
258 assert set(self.address_type_ids) == set([address_type['address_type_id'] for address_type in address_types])
261 print "Added address types", self.address_type_ids
263 def DeleteAddressTypes(self):
265 Delete any random address types we may have added.
268 for address_type_id in self.address_type_ids:
269 DeleteAddressType(address_type_id)
271 assert not GetAddressTypes([address_type_id])
274 assert not GetAddressTypes(self.address_type_ids)
277 print "Deleted address types", self.address_type_ids
279 self.address_type_ids = []
281 def AddAddresses(self, n = 3):
283 Add a number of random addresses to each site.
286 for site_id in self.site_ids:
288 address_fields = random_address()
289 address_id = AddSiteAddress(site_id, address_fields)
291 # Should return a unique address_id
292 assert address_id not in self.address_ids
293 self.address_ids.append(address_id)
297 address = GetAddresses([address_id])[0]
298 for field in address_fields:
299 assert address[field] == address_fields[field]
302 address_fields = random_address()
303 UpdateAddress(address_id, address_fields)
305 # Check address again
306 address = GetAddresses([address_id])[0]
307 for field in address_fields:
308 assert address[field] == address_fields[field]
311 for address_type_id in self.address_type_ids:
312 AddAddressTypeToAddress(address_type_id, address_id)
315 addresses = GetAddresses(self.address_ids)
316 assert set(self.address_ids) == set([address['address_id'] for address in addresses])
317 for address in addresses:
318 assert set(self.address_type_ids) == set(address['address_type_ids'])
321 print "Added addresses", self.address_ids
323 def DeleteAddresses(self):
325 Delete any random addresses we may have added.
328 # Delete site addresses
329 for address_id in self.address_ids:
330 # Remove address types
331 for address_type_id in self.address_type_ids:
332 DeleteAddressTypeFromAddress(address_type_id, address_id)
335 address = GetAddresses([address_id])[0]
336 assert not address['address_type_ids']
338 DeleteAddress(address_id)
340 assert not GetAddresses([address_id])
343 assert not GetAddresses(self.address_ids)
346 print "Deleted addresses", self.address_ids
348 self.address_ids = []
350 def AddPersons(self, n = 3):
352 Add a number of random users to each site.
356 role_ids = [role['role_id'] for role in roles]
357 roles = [role['name'] for role in roles]
358 roles = dict(zip(roles, role_ids))
363 person_fields = random_person()
364 person_id = AddPerson(person_fields)
366 # Should return a unique person_id
367 assert person_id not in self.person_ids
368 self.person_ids.append(person_id)
372 person = GetPersons([person_id])[0]
373 for field in person_fields:
374 if field != 'password':
375 assert person[field] == person_fields[field]
378 person_fields = random_person()
379 UpdatePerson(person_id, person_fields)
381 # Check account again
382 person = GetPersons([person_id])[0]
383 for field in person_fields:
384 if field != 'password':
385 assert person[field] == person_fields[field]
387 auth = {'AuthMethod': "password",
388 'Username': person_fields['email'],
389 'AuthString': person_fields['password']}
392 # Check that account is disabled
394 assert not AuthCheck(auth)
398 # Add random set of roles
399 person_roles = random.sample(['user', 'pi', 'tech'], randint(1, 3))
400 for person_role in person_roles:
401 role_id = roles[person_role]
402 AddRoleToPerson(role_id, person_id)
405 person = GetPersons([person_id])[0]
406 assert set(person_roles) == set(person['roles'])
409 UpdatePerson(person_id, {'enabled': True})
412 # Check that account is enabled
413 assert AuthCheck(auth)
415 # Associate account with random set of sites
417 for site_id in random.sample(self.site_ids, randint(1, len(self.site_ids))):
418 AddPersonToSite(person_id, site_id)
419 person_site_ids.append(site_id)
422 # Make sure it really did it
423 person = GetPersons([person_id])[0]
424 assert set(person_site_ids) == set(person['site_ids'])
427 primary_site_id = random.sample(person_site_ids, randint(1, len(person_site_ids)))[0]
428 SetPersonPrimarySite(person_id, primary_site_id)
431 person = GetPersons([person_id])[0]
432 assert person['site_ids'][0] == primary_site_id
435 print "Added users", self.person_ids
437 def DeletePersons(self):
439 for person_id in self.person_ids:
440 # Remove from each site
441 for site_id in self.site_ids:
442 DeletePersonFromSite(person_id, site_id)
445 person = GetPersons([person_id])[0]
446 assert not person['site_ids']
449 person = GetPersons([person_id])[0]
450 for role_id in person['role_ids']:
451 DeleteRoleFromPerson(role_id, person_id)
454 person = GetPersons([person_id])[0]
455 assert not person['role_ids']
458 UpdatePerson(person_id, {'enabled': False})
461 person = GetPersons([person_id])[0]
462 assert not person['enabled']
465 DeletePerson(person_id)
468 assert not GetPersons([person_id])
471 assert not GetPersons(self.person_ids)
474 print "Deleted users", self.person_ids
478 if __name__ == "__main__":
479 parser = OptionParser()
480 parser.add_option("-c", "--check", action = "store_true", default = False, help = "Verify actions (default: %default)")
481 parser.add_option("-q", "--quiet", action = "store_true", default = False, help = "Be quiet (default: %default)")
482 parser.add_option("-p", "--populate", action = "store_true", default = False, help = "Do not cleanup (default: %default)")
483 (options, args) = parser.parse_args()
484 test = Test(check = options.check, verbose = not options.quiet)
486 if not options.populate: