revert accidental (too early) checkin
[plcapi.git] / Test.py
1 #!/usr/bin/python
2 #
3 # Test script example
4 #
5 # Mark Huang <mlhuang@cs.princeton.edu>
6 # Copyright (C) 2006 The Trustees of Princeton University
7 #
8 # $Id: Test.py,v 1.16 2007/01/09 16:13:36 mlhuang Exp $
9 #
10
11 from pprint import pprint
12 from string import letters, digits, punctuation
13 from traceback import print_exc
14 from optparse import OptionParser
15 import base64
16 import os
17
18 from random import Random
19 random = Random()
20
21 from PLC.Shell import Shell
22 shell = Shell(globals())
23
24 def randfloat(min = 0.0, max = 1.0):
25     return float(min) + (random.random() * (float(max) - float(min)))
26
27 def randint(min = 0, max = 1):
28     return int(randfloat(min, max + 1))
29
30 # See "2.2 Characters" in the XML specification:
31 #
32 # #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD]
33 # avoiding
34 # [#x7F-#x84], [#x86-#x9F], [#xFDD0-#xFDDF]
35
36 ascii_xml_chars = map(unichr, [0x9, 0xA, 0xD])
37 ascii_xml_chars += map(unichr, xrange(0x20, 0x7F - 1))
38 low_xml_chars = list(ascii_xml_chars)
39 low_xml_chars += map(unichr, xrange(0x84 + 1, 0x86 - 1))
40 low_xml_chars += map(unichr, xrange(0x9F + 1, 0xFF))
41 valid_xml_chars = list(low_xml_chars)
42 valid_xml_chars += map(unichr, xrange(0xFF + 1, 0xD7FF))
43 valid_xml_chars += map(unichr, xrange(0xE000, 0xFDD0 - 1))
44 valid_xml_chars += map(unichr, xrange(0xFDDF + 1, 0xFFFD))
45
46 def randstr(length, pool = valid_xml_chars, encoding = "utf-8"):
47     sample = random.sample(pool, min(length, len(pool)))
48     while True:
49         s = u''.join(sample)
50         bytes = len(s.encode(encoding))
51         if bytes > length:
52             sample.pop()
53         elif bytes < length:
54             sample += random.sample(pool, min(length - bytes, len(pool)))
55             random.shuffle(sample)
56         else:
57             break
58     return s
59
60 def randhostname():
61     # 1. Each part begins and ends with a letter or number.
62     # 2. Each part except the last can contain letters, numbers, or hyphens.
63     # 3. Each part is between 1 and 64 characters, including the trailing dot.
64     # 4. At least two parts.
65     # 5. Last part can only contain between 2 and 6 letters.
66     hostname = 'a' + randstr(61, letters + digits + '-') + '1.' + \
67                'b' + randstr(61, letters + digits + '-') + '2.' + \
68                'c' + randstr(5, letters)
69     return hostname
70
71 def randpath(length):
72     parts = []
73     for i in range(randint(1, 10)):
74         parts.append(randstr(randint(1, 30), ascii_xml_chars))
75     return os.sep.join(parts)[0:length]
76
77 def randemail():
78     return (randstr(100, letters + digits) + "@" + randhostname()).lower()
79
80 def randkey(bits = 2048):
81     key_types = ["ssh-dss", "ssh-rsa"]
82     key_type = random.sample(key_types, 1)[0]
83     return ' '.join([key_type,
84                      base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))),
85                      randemail()])
86
87 def random_site():
88     return {
89         'name': randstr(254),
90         'abbreviated_name': randstr(50),
91         'login_base': randstr(20, letters).lower(),
92         'latitude': int(randfloat(-90.0, 90.0) * 1000) / 1000.0,
93         'longitude': int(randfloat(-180.0, 180.0) * 1000) / 1000.0,
94         }
95             
96 def random_address_type():
97     return {
98         'name': randstr(20),
99         'description': randstr(254),
100         }
101
102 def random_address():
103     return {
104         'line1': randstr(254),
105         'line2': randstr(254),
106         'line3': randstr(254),
107         'city': randstr(254),
108         'state': randstr(254),
109         'postalcode': randstr(64),
110         'country': randstr(128),
111         }
112
113 def random_person():
114     return {
115         'first_name': randstr(128),
116         'last_name': randstr(128),
117         'email': randemail(),
118         'bio': randstr(254),
119         # Accounts are disabled by default
120         'enabled': False,
121         'password': randstr(254),
122         }
123
124 def random_key():
125     return {
126         'key_type': random.sample(key_types, 1)[0],
127         'key': randkey()
128         }
129
130 def random_slice():
131     return {
132         'name': site['login_base'] + "_" + randstr(11, letters).lower(),
133         'url': "http://" + randhostname() + "/",
134         'description': randstr(2048),
135         }
136
137 class Test:
138     def __init__(self, check = True, verbose = True):
139         self.check = check
140         self.verbose = verbose
141         
142         self.site_ids = []
143         self.address_type_ids = []
144         self.address_ids = []
145         self.person_ids = []
146
147     def run(self,
148             sites = 100,
149             address_types = 3,
150             addresses = 2,
151             persons = 1000,
152             keys = 3):
153         try:
154             try:
155                 self.AddSites(sites)
156                 self.AddAddressTypes(address_types)
157                 self.AddAddresses(addresses)
158                 self.AddPersons(persons)
159             except:
160                 print_exc()
161         finally:
162             self.cleanup()
163
164     def cleanup(self):
165         self.DeletePersons()
166         self.DeleteAddresses()
167         self.DeleteAddressTypes()
168         self.DeleteSites()
169
170     def AddSites(self, n = 3):
171         """
172         Add a number of random sites.
173         """
174
175         for i in range(n):
176             # Add site
177             site_fields = random_site()
178             site_id = AddSite(site_fields)
179
180             # Should return a unique site_id
181             assert site_id not in self.site_ids
182             self.site_ids.append(site_id)
183
184             if self.check:
185                 # Check site
186                 site = GetSites([site_id])[0]
187                 for field in site_fields:
188                     assert site[field] == site_fields[field]
189
190             # Update site
191             site_fields = random_site()
192             # XXX Currently cannot change login_base
193             del site_fields['login_base']
194             site_fields['max_slices'] = randint(1, 10)
195             UpdateSite(site_id, site_fields)
196
197             if self.check:
198                 # Check site again
199                 site = GetSites([site_id])[0]
200                 for field in site_fields:
201                     assert site[field] == site_fields[field]
202
203         if self.check:
204             sites = GetSites(self.site_ids)
205             assert set(self.site_ids) == set([site['site_id'] for site in sites])
206
207         if self.verbose:
208             print "Added sites", self.site_ids
209
210     def DeleteSites(self):
211         """
212         Delete any random sites we may have added.
213         """
214
215         for site_id in self.site_ids:
216             DeleteSite(site_id)
217             if self.check:
218                 assert not GetSites([site_id])
219
220         if self.check:
221             assert not GetSites(self.site_ids)
222
223         if self.verbose:
224             print "Deleted sites", self.site_ids
225
226         self.site_ids = []
227
228     def AddAddressTypes(self, n = 3):
229         """
230         Add a number of random address types.
231         """
232         
233         for i in range(n):
234             address_type_fields = random_address_type()
235             address_type_id = AddAddressType(address_type_fields)
236
237             # Should return a unique address_type_id
238             assert address_type_id not in self.address_type_ids
239             self.address_type_ids.append(address_type_id)
240
241             if self.check:
242                 # Check address type
243                 address_type = GetAddressTypes([address_type_id])[0]
244                 for field in 'name', 'description':
245                     assert address_type[field] == address_type_fields[field]
246
247                 # Update address type
248                 address_type_fields = random_address_type()
249                 UpdateAddressType(address_type_id, address_type_fields)
250             
251                 # Check address type again
252                 address_type = GetAddressTypes([address_type_id])[0]
253                 for field in 'name', 'description':
254                     assert address_type[field] == address_type_fields[field]
255
256         if self.check:
257             address_types = GetAddressTypes(self.address_type_ids)
258             assert set(self.address_type_ids) == set([address_type['address_type_id'] for address_type in address_types])
259
260         if self.verbose:
261             print "Added address types", self.address_type_ids
262
263     def DeleteAddressTypes(self):
264         """
265         Delete any random address types we may have added.
266         """
267
268         for address_type_id in self.address_type_ids:
269             DeleteAddressType(address_type_id)
270             if self.check:
271                 assert not GetAddressTypes([address_type_id])
272
273         if self.check:
274             assert not GetAddressTypes(self.address_type_ids)
275
276         if self.verbose:
277             print "Deleted address types", self.address_type_ids
278
279         self.address_type_ids = []
280
281     def AddAddresses(self, n = 3):
282         """
283         Add a number of random addresses to each site.
284         """
285
286         for site_id in self.site_ids:
287             for i in range(n):
288                 address_fields = random_address()
289                 address_id = AddSiteAddress(site_id, address_fields)
290
291                 # Should return a unique address_id
292                 assert address_id not in self.address_ids
293                 self.address_ids.append(address_id)
294
295                 if self.check:
296                     # Check address
297                     address = GetAddresses([address_id])[0]
298                     for field in address_fields:
299                         assert address[field] == address_fields[field]
300
301                     # Update address
302                     address_fields = random_address()
303                     UpdateAddress(address_id, address_fields)
304
305                     # Check address again
306                     address = GetAddresses([address_id])[0]
307                     for field in address_fields:
308                         assert address[field] == address_fields[field]
309
310                 # Add address types
311                 for address_type_id in self.address_type_ids:
312                     AddAddressTypeToAddress(address_type_id, address_id)
313
314         if self.check:
315             addresses = GetAddresses(self.address_ids)
316             assert set(self.address_ids) == set([address['address_id'] for address in addresses])
317             for address in addresses:
318                 assert set(self.address_type_ids) == set(address['address_type_ids'])
319
320         if self.verbose:
321             print "Added addresses", self.address_ids
322
323     def DeleteAddresses(self):
324         """
325         Delete any random addresses we may have added.
326         """
327
328         # Delete site addresses
329         for address_id in self.address_ids:
330             # Remove address types
331             for address_type_id in self.address_type_ids:
332                 DeleteAddressTypeFromAddress(address_type_id, address_id)
333
334             if self.check:
335                 address = GetAddresses([address_id])[0]
336                 assert not address['address_type_ids']
337
338             DeleteAddress(address_id)
339             if self.check:
340                 assert not GetAddresses([address_id])
341
342         if self.check:
343             assert not GetAddresses(self.address_ids)
344
345         if self.verbose:
346             print "Deleted addresses", self.address_ids
347
348         self.address_ids = []
349
350     def AddPersons(self, n = 3):
351         """
352         Add a number of random users to each site.
353         """
354
355         roles = GetRoles()
356         role_ids = [role['role_id'] for role in roles]
357         roles = [role['name'] for role in roles]
358         roles = dict(zip(roles, role_ids))
359
360         for i in range(n):
361
362             # Add account
363             person_fields = random_person()
364             person_id = AddPerson(person_fields)
365
366             # Should return a unique person_id
367             assert person_id not in self.person_ids
368             self.person_ids.append(person_id)
369
370             if self.check:
371                 # Check account
372                 person = GetPersons([person_id])[0]
373                 for field in person_fields:
374                     if field != 'password':
375                         assert person[field] == person_fields[field]
376
377                 # Update account
378                 person_fields = random_person()
379                 UpdatePerson(person_id, person_fields)
380
381                 # Check account again
382                 person = GetPersons([person_id])[0]
383                 for field in person_fields:
384                     if field != 'password':
385                         assert person[field] == person_fields[field]
386
387             auth = {'AuthMethod': "password",
388                     'Username': person_fields['email'],
389                     'AuthString': person_fields['password']}
390
391             if self.check:
392                 # Check that account is disabled
393                 try:
394                     assert not AuthCheck(auth)
395                 except:
396                     pass
397
398             # Add random set of roles
399             person_roles = random.sample(['user', 'pi', 'tech'], randint(1, 3))
400             for person_role in person_roles:
401                 role_id = roles[person_role]
402                 AddRoleToPerson(role_id, person_id)
403
404             if self.check:
405                 person = GetPersons([person_id])[0]
406                 assert set(person_roles) == set(person['roles'])
407
408             # Enable account
409             UpdatePerson(person_id, {'enabled': True})
410
411             if self.check:
412                 # Check that account is enabled
413                 assert AuthCheck(auth)
414
415             # Associate account with random set of sites
416             person_site_ids = []
417             for site_id in random.sample(self.site_ids, randint(1, len(self.site_ids))):
418                 AddPersonToSite(person_id, site_id)
419                 person_site_ids.append(site_id)
420
421             if self.check:
422                 # Make sure it really did it
423                 person = GetPersons([person_id])[0]
424                 assert set(person_site_ids) == set(person['site_ids'])
425
426             # Set a primary site
427             primary_site_id = random.sample(person_site_ids, randint(1, len(person_site_ids)))[0]
428             SetPersonPrimarySite(person_id, primary_site_id)
429
430             if self.check:
431                 person = GetPersons([person_id])[0]
432                 assert person['site_ids'][0] == primary_site_id
433
434         if self.verbose:
435             print "Added users", self.person_ids
436
437     def DeletePersons(self):
438         # Delete users
439         for person_id in self.person_ids:
440             # Remove from each site
441             for site_id in self.site_ids:
442                 DeletePersonFromSite(person_id, site_id)
443
444             if self.check:
445                 person = GetPersons([person_id])[0]
446                 assert not person['site_ids']
447
448             # Revoke roles
449             person = GetPersons([person_id])[0]
450             for role_id in person['role_ids']:
451                 DeleteRoleFromPerson(role_id, person_id)
452
453             if self.check:
454                 person = GetPersons([person_id])[0]
455                 assert not person['role_ids']
456
457             # Disable account
458             UpdatePerson(person_id, {'enabled': False})
459
460             if self.check:
461                 person = GetPersons([person_id])[0]
462                 assert not person['enabled']
463
464             # Delete account
465             DeletePerson(person_id)
466
467             if self.check:
468                 assert not GetPersons([person_id])                         
469
470         if self.check:
471             assert not GetPersons(self.person_ids)
472
473         if self.verbose:
474             print "Deleted users", self.person_ids
475
476         self.person_ids = []
477
478 if __name__ == "__main__":
479     parser = OptionParser()
480     parser.add_option("-c", "--check", action = "store_true", default = False, help = "Verify actions (default: %default)")
481     parser.add_option("-q", "--quiet", action = "store_true", default = False, help = "Be quiet (default: %default)")
482     parser.add_option("-p", "--populate", action = "store_true", default = False, help = "Do not cleanup (default: %default)")
483     (options, args) = parser.parse_args()
484     test = Test(check = options.check, verbose = not options.quiet)
485     test.run()
486     if not options.populate:
487         test.cleanup()