initial checkin of qa api
[tests.git] / qaapi / TestAPI.py
1 #!/usr/bin/python
2 #
3 # Test script example
4 #
5 # Mark Huang <mlhuang@cs.princeton.edu>
6 # Copyright (C) 2006 The Trustees of Princeton University
7 #
8 # $Id: Test.py,v 1.18 2007/01/09 16:22:49 mlhuang Exp $
9 #
10
11 from pprint import pprint
12 from string import letters, digits, punctuation
13 from traceback import print_exc
14 from optparse import OptionParser
15 import base64
16 import os
17 import socket
18 import xmlrpclib
19
20 from Config import Config
21 from logger import log
22 from random import Random
23 random = Random()
24
25 config = Config()
26 api = config.api
27 auth = api.auth
28
29 boot_states = api.GetBootStates(auth)
30 roles = [role['name'] for role in api.GetRoles(auth)]
31 methods = api.GetNetworkMethods(auth)
32 types = api.GetNetworkTypes(auth)
33
34 #ifrom PLC.Shell import Shell
35 #shell = Shell(globals())
36
37 def randfloat(min = 0.0, max = 1.0):
38     return float(min) + (random.random() * (float(max) - float(min)))
39
40 def randint(min = 0, max = 1):
41     return int(randfloat(min, max + 1))
42
43 # See "2.2 Characters" in the XML specification:
44 #
45 # #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD]
46 # avoiding
47 # [#x7F-#x84], [#x86-#x9F], [#xFDD0-#xFDDF]
48
49 ascii_xml_chars = map(unichr, [0x9, 0xA, 0xD])
50 ascii_xml_chars += map(unichr, xrange(0x20, 0x7F - 1))
51 low_xml_chars = list(ascii_xml_chars)
52 low_xml_chars += map(unichr, xrange(0x84 + 1, 0x86 - 1))
53 low_xml_chars += map(unichr, xrange(0x9F + 1, 0xFF))
54 valid_xml_chars = list(low_xml_chars)
55 valid_xml_chars += map(unichr, xrange(0xFF + 1, 0xD7FF))
56 valid_xml_chars += map(unichr, xrange(0xE000, 0xFDD0 - 1))
57 valid_xml_chars += map(unichr, xrange(0xFDDF + 1, 0xFFFD))
58
59
60 def randstr(length, pool = valid_xml_chars, encoding = "utf-8"):
61     sample = random.sample(pool, min(length, len(pool)))
62     while True:
63         s = u''.join(sample)
64         bytes = len(s.encode(encoding))
65         if bytes > length:
66             sample.pop()
67         elif bytes < length:
68             sample += random.sample(pool, min(length - bytes, len(pool)))
69             random.shuffle(sample)
70         else:
71             break
72     return s
73
74 def randhostname():
75     # 1. Each part begins and ends with a letter or number.
76     # 2. Each part except the last can contain letters, numbers, or hyphens.
77     # 3. Each part is between 1 and 64 characters, including the trailing dot.
78     # 4. At least two parts.
79     # 5. Last part can only contain between 2 and 6 letters.
80     hostname = 'a' + randstr(61, letters + digits + '-') + '1.' + \
81                'b' + randstr(61, letters + digits + '-') + '2.' + \
82                'c' + randstr(5, letters)
83     return hostname
84
85 def randpath(length):
86     parts = []
87     for i in range(randint(1, 10)):
88         parts.append(randstr(randint(1, 30), ascii_xml_chars))
89     return os.sep.join(parts)[0:length]
90
91 def randemail():
92     return (randstr(100, letters + digits) + "@" + randhostname()).lower()
93
94 def randkey(bits = 2048):
95     key_types = ["ssh-dss", "ssh-rsa"]
96     key_type = random.sample(key_types, 1)[0]
97     return ' '.join([key_type,
98                      base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))),
99                      randemail()])
100
101 def random_site():
102     return {
103         'name': randstr(254),
104         'abbreviated_name': randstr(50),
105         'login_base': randstr(20, letters).lower(),
106         'latitude': int(randfloat(-90.0, 90.0) * 1000) / 1000.0,
107         'longitude': int(randfloat(-180.0, 180.0) * 1000) / 1000.0,
108         }
109             
110 def random_address_type():
111     return {
112         'name': randstr(20),
113         'description': randstr(254),
114         }
115
116 def random_address():
117     return {
118         'line1': randstr(254),
119         'line2': randstr(254),
120         'line3': randstr(254),
121         'city': randstr(254),
122         'state': randstr(254),
123         'postalcode': randstr(64),
124         'country': randstr(128),
125         }
126
127 def random_person():
128     return {
129         'first_name': randstr(128),
130         'last_name': randstr(128),
131         'email': randemail(),
132         'bio': randstr(254),
133         # Accounts are disabled by default
134         'enabled': False,
135         'password': randstr(254),
136         }
137
138 def random_key():
139     return {
140         'key_type': random.sample(key_types, 1)[0],
141         'key': randkey()
142         }
143
144 def random_slice():
145     return {
146         'name': site['login_base'] + "_" + randstr(11, letters).lower(),
147         'url': "http://" + randhostname() + "/",
148         'description': randstr(2048),
149         }
150
151 def random_nodegroup():
152     return {
153         'name': randstr(50),
154         'description': randstr(200),
155         }
156
157 def random_node():
158    return {
159        'hostname': randhostname(),
160        'boot_state': random.sample(boot_states, 1)[0],
161        'model': randstr(255),
162        'version': randstr(64),
163        }
164
165 def random_nodenetwork():
166     nodenetwork_fields = {
167         'method': random.sample(methods, 1)[0],
168         'type': random.sample(types, 1)[0],
169         'bwlimit': randint(500000, 10000000),
170         }
171
172     if method != 'dhcp':
173         ip = randint(0, 0xffffffff)
174         netmask = (0xffffffff << randint(2, 31)) & 0xffffffff
175         network = ip & netmask
176         broadcast = ((ip & netmask) | ~netmask) & 0xffffffff
177         gateway = randint(network + 1, broadcast - 1)
178         dns1 = randint(0, 0xffffffff)
179
180         for field in 'ip', 'netmask', 'network', 'broadcast', 'gateway', 'dns1':
181             nodenetwork_fields[field] = socket.inet_ntoa(struct.pack('>L', locals()[field]))
182
183     return nodenetwork_fields
184
185 def random_pcu():
186     return {
187         'hostname': randhostname(),
188         'ip': socket.inet_ntoa(struct.pack('>L', randint(0, 0xffffffff))),
189         'protocol': randstr(16),
190         'username': randstr(254),
191         'password': randstr(254),
192         'notes': randstr(254),
193         'model': randstr(32),
194         }
195
196 def random_conf_file():
197     return {
198         'enabled': bool(randint()),
199         'source': randpath(255),
200         'dest': randpath(255),
201         'file_permissions': "%#o" % randint(0, 512),
202         'file_owner': randstr(32, letters + '_' + digits),
203         'file_group': randstr(32, letters + '_' + digits),
204         'preinstall_cmd': randpath(100),
205         'postinstall_cmd': randpath(100),
206         'error_cmd': randpath(100),
207         'ignore_cmd_errors': bool(randint()),
208         'always_update': bool(randint()),
209         }
210
211 def random_attribute_type():
212     return {
213         'name': randstr(100),
214         'description': randstr(254),
215         'min_role_id': random.sample(roles.values(), 1)[0],
216         }
217
218 def isequal(object_fields, expected_fields):
219     for field in expected_fields:
220         assert field in object_fields
221         assert object_fields[field] == expected_fields[field]
222
223 def islistequal(list1, list2):
224     assert set(list1) == set(list2) 
225
226 def isunique(id, id_list):
227     assert id not in id_list
228
229 def get_methods(self, prefix):
230     method_list = filter(lambda name: name.startswith(prefix), self.methods)
231
232
233
234 tests = [
235     # test_type: (Method, arguments, method_result_holder, number_to_add)       
236     {'add': (api.AddSite, random_site(), site_ids, 3),
237      'add_check': (isunique,     
238
239     ]
240
241 class Entity:
242     """
243     Template class for testing Add methods
244     """         
245     def __init__(self, auth, add_method, update_method, get_method, delete_method, primary_key):    
246         
247         self.methods_tested = []
248         for method in [add_method, update_method, get_method, delete_method]:
249             self.methods_tested.append(method._Method_name)
250         
251         self.auth = auth
252         self.Add = log(add_method)
253         self.Update = log(update_method)
254         self.Get = log(get_method)
255         self.Delete = log(delete_method)
256         self.primary_key = primary_key
257         self.object_ids = []
258          
259     def test(self, args_method, num):
260         
261         self.object_ids = []
262         
263         for i in range(num):  
264             # Add object
265             object_fields = args_method()
266             object_id = self.Add(self.auth, object_fields)
267             
268             # Should return a unique id
269             AddCheck = log(isunique, 'Unique Check')
270             AddCheck(object_id, object_ids)
271             self.object_ids.extend([object_id])
272
273             # Check object
274             object = slef.Get(self.auth, [object_id])[0]
275             CheckObject = log(isequal, 'Add Check')
276             CheckObject(object, object_fields)
277             
278             # Update object
279             object_fields = args_method()
280             self.Update(self.auth, object_id, object_fields)
281
282             # Check again
283             object = self.Get(self.auth, [object_id])[0]
284             CheckObject = log(isqeual, 'Update Check')          
285             CheckSite(object, object_fields)
286
287         # Check Get all sites
288         objects = self.Get(self.auth, object_ids)
289         CheckObjects = log(islistqual, 'Get Check')
290         CheckObjects(object_ids, [object[self.primary_key] for object in objects])          
291             
292         return self.object_ids
293
294     def cleanup(self):
295         # Delete objects
296         for object_id in sefl.object_ids:
297             self.Delete(self.auth, object_id)
298             
299         # Check if objects are deleted
300         CheckObjects = log(islistequal, 'Delete Check') 
301         ChecObjects(api.Get(auth, self.object_ids), [])         
302         
303
304         
305 class Test:
306     def __init__(self, config, verbose = True):
307         self.check = True
308         self.verbose = verbose
309         self.methods = set(api.system.listMethods())
310         self.methods_tested = set()
311         
312         self.site_ids = []
313         self.address_type_ids = []
314         self.address_ids = []
315         self.person_ids = []
316
317
318     def run(self,
319             sites = 1,
320             address_types = 3,
321             addresses = 2,
322             persons = 1000,
323             keys = 3):
324         try:
325             try:
326                 site_test = Entity(auth, api.AddSite, api.UpdateSite, api.GetSite, api.DeleteSite, 'site_id')                site_test.test(random_site, sites)
327
328                 self.AddressTypes(address_types)
329                 self.AddAddresses(addresses)
330                 self.AddPersons(persons)
331             except:
332                 print_exc()
333         finally:
334             for method in set(self.methods).difference(self.methoods_tested):
335                 print >> test_log, "%(method)s [Not Tested]" % locals() 
336
337     def cleanup(self):
338         self.DeletePersons()
339         self.DeleteAddresses()
340         self.DeleteAddressTypes()
341         self.DeleteSites()
342
343         
344
345     def Sites(self, n = 1):
346         """
347         Add and Modify a random site.
348         """
349
350         for i in range(n):
351             # Add site
352             site_fields = random_site()
353             AddSite = log(api.AddSite)
354             site_id = AddSite(auth, site_fields)
355             self.methods_tested.update(['AddSite'])             
356             
357             # Should return a unique site_id
358             CheckSite = log(isunique, 'Unique Check')
359             CheckSite(site_id, self.site_ids)
360             self.site_ids.append(site_id)
361
362             # Check site
363             GetSites = log(api.GetSites)
364             site = GetSites(auth, [site_id])[0]
365             CheckSite = log(isequal, 'AddSite Check')
366             CheckSite(site, site_fields)
367             self.methods_tested.update(['GetSites'])    
368
369             # Update site
370             site_fields = random_site()
371             # XXX Currently cannot change login_base
372             del site_fields['login_base']
373             site_fields['max_slices'] = randint(1, 10)
374             UpdateSite = log(api.UpdateSite)
375             UpdateSite(auth, site_id, site_fields)
376             self.methods_tested.update(['UpdateSite'])
377
378             # Check site again
379             site = GetSites(auth, [site_id])[0]
380             CheckSite = log(isequal, 'UpdateSite Check')
381             CheckSite(site, site_fields)
382
383          
384         # Check Get all sites   
385         sites = GetSites(auth, self.site_ids)
386         CheckSite = log(islistequal, 'GetSites Check')
387         CheckSite(self.site_ids, [site['site_id'] for site in sites])
388
389         if self.verbose:
390             print "Added sites", self.site_ids
391
392     def DeleteSites(self):
393         """
394         Delete any random sites we may have added.
395         """
396         # Delete all sites
397         DeleteSite = log(api.DeleteSite)
398         for site_id in self.site_ids:
399             DeleteSite(auth, site_id)
400         self.methods_tested.update(['DeleteSite'])
401
402         # Check if sites are deleted
403         CheckSite = log(islistequal, 'DeleteSite Check')
404         CheckSite(api.GetSites(auth, self.site_ids), [])
405
406         if self.verbose:
407             print "Deleted sites", self.site_ids
408
409         self.site_ids = []
410
411     def AddAddressTypes(self, n = 3):
412         """
413         Add a number of random address types.
414         """
415         
416         for i in range(n):
417             address_type_fields = random_address_type()
418             AddAddressType = log(api.AddAddressType)
419             address_type_id = AddAddressType(auth, address_type_fields)
420             self.methods_tested.update(['AddAddressType'])
421
422             # Should return a unique address_type_id
423             CheckAddressType = log(isunique, 'Unique Check')
424             CheckAddressType(address_type_id, self.address_type_ids)
425             self.address_type_ids.append(address_type_id)
426
427             # Check address type
428             GetAddressTypes = log(api.GetAddressTypes)
429             address_type = GetAddressTypes(auth, [address_type_id])[0]
430             CheckAddressType = log(isequal, 'AddAddressType Check')
431             CheckAddressType(address_type, address_type_fields)
432
433             # Update address type
434             address_type_fields = random_address_type()
435             UpdateAddressType = log(api.UpdateAddressType)
436             UpdateAddressType(auth, address_type_id, address_type_fields)
437             
438             # Check address type again
439             address_type = GetAddressTypes([address_type_id])[0]
440             CheckAddressType = log(isequal, 'UpdateAddressType Check')
441             CheckAddressType(address_type, address_type_fields)
442             self.methods_tested.update(['UpdateAddressType'])
443
444         # Check get all address types
445         address_types = GetAddressTypes(auth, self.address_type_ids)
446         CheckAddressType = log(islistequal, 'GetAddressType Check')
447         CheckAddressType(self.address_type_ids, 
448                          [address_type['address_type_id' for address_type in address_types])
449         self.methods_tested.update(['GetAddressTypes'])
450
451         if self.verbose:
452             print "Added address types", self.address_type_ids
453
454     def DeleteAddressTypes(self):
455         """
456         Delete any random address types we may have added.
457         """
458
459         DeleteAddressType = log(api.DeleteAddressType)
460         for address_type_id in self.address_type_ids:
461             DeleteAddressType(auth, address_type_id)
462         self.methods_tested.update(['DeleteAddressType'])
463
464         CheckAddressType = log(islistequal, 'DeleteAddressType Check')
465         CheckAddressType(api.GetAddressTypes(auth, self.address_type_ids), [])
466
467         if self.verbose:
468             print "Deleted address types", self.address_type_ids
469
470         self.address_type_ids = []
471
472     def AddAddresses(self, n = 3):
473         """
474         Add a number of random addresses to each site.
475         """
476
477         for site_id in self.site_ids:
478             for i in range(n):
479                 address_fields = random_address()
480                 address_id = AddSiteAddress(site_id, address_fields)
481
482                 # Should return a unique address_id
483                 assert address_id not in self.address_ids
484                 self.address_ids.append(address_id)
485
486                 if self.check:
487                     # Check address
488                     address = GetAddresses([address_id])[0]
489                     for field in address_fields:
490                         assert address[field] == address_fields[field]
491
492                     # Update address
493                     address_fields = random_address()
494                     UpdateAddress(address_id, address_fields)
495
496                     # Check address again
497                     address = GetAddresses([address_id])[0]
498                     for field in address_fields:
499                         assert address[field] == address_fields[field]
500
501                 # Add address types
502                 for address_type_id in self.address_type_ids:
503                     AddAddressTypeToAddress(address_type_id, address_id)
504
505         if self.check:
506             addresses = GetAddresses(self.address_ids)
507             assert set(self.address_ids) == set([address['address_id'] for address in addresses])
508             for address in addresses:
509                 assert set(self.address_type_ids) == set(address['address_type_ids'])
510
511         if self.verbose:
512             print "Added addresses", self.address_ids
513
514     def DeleteAddresses(self):
515         """
516         Delete any random addresses we may have added.
517         """
518
519         # Delete site addresses
520         for address_id in self.address_ids:
521             # Remove address types
522             for address_type_id in self.address_type_ids:
523                 DeleteAddressTypeFromAddress(address_type_id, address_id)
524
525             if self.check:
526                 address = GetAddresses([address_id])[0]
527                 assert not address['address_type_ids']
528
529             DeleteAddress(address_id)
530             if self.check:
531                 assert not GetAddresses([address_id])
532
533         if self.check:
534             assert not GetAddresses(self.address_ids)
535
536         if self.verbose:
537             print "Deleted addresses", self.address_ids
538
539         self.address_ids = []
540
541     def AddPersons(self, n = 3):
542         """
543         Add a number of random users to each site.
544         """
545
546         roles = GetRoles()
547         role_ids = [role['role_id'] for role in roles]
548         roles = [role['name'] for role in roles]
549         roles = dict(zip(roles, role_ids))
550
551         for i in range(n):
552
553             # Add account
554             person_fields = random_person()
555             person_id = AddPerson(person_fields)
556
557             # Should return a unique person_id
558             assert person_id not in self.person_ids
559             self.person_ids.append(person_id)
560
561             if self.check:
562                 # Check account
563                 person = GetPersons([person_id])[0]
564                 for field in person_fields:
565                     if field != 'password':
566                         assert person[field] == person_fields[field]
567
568                 # Update account
569                 person_fields = random_person()
570                 UpdatePerson(person_id, person_fields)
571
572                 # Check account again
573                 person = GetPersons([person_id])[0]
574                 for field in person_fields:
575                     if field != 'password':
576                         assert person[field] == person_fields[field]
577
578             auth = {'AuthMethod': "password",
579                     'Username': person_fields['email'],
580                     'AuthString': person_fields['password']}
581
582             if self.check:
583                 # Check that account is disabled
584                 try:
585                     assert not AuthCheck(auth)
586                 except:
587                     pass
588
589             # Add random set of roles
590             person_roles = random.sample(['user', 'pi', 'tech'], randint(1, 3))
591             for person_role in person_roles:
592                 role_id = roles[person_role]
593                 AddRoleToPerson(role_id, person_id)
594
595             if self.check:
596                 person = GetPersons([person_id])[0]
597                 assert set(person_roles) == set(person['roles'])
598
599             # Enable account
600             UpdatePerson(person_id, {'enabled': True})
601
602             if self.check:
603                 # Check that account is enabled
604                 assert AuthCheck(auth)
605
606             # Associate account with random set of sites
607             person_site_ids = []
608             for site_id in random.sample(self.site_ids, randint(1, len(self.site_ids))):
609                 AddPersonToSite(person_id, site_id)
610                 person_site_ids.append(site_id)
611
612             if self.check:
613                 # Make sure it really did it
614                 person = GetPersons([person_id])[0]
615                 assert set(person_site_ids) == set(person['site_ids'])
616
617             # Set a primary site
618             primary_site_id = random.sample(person_site_ids, randint(1, len(person_site_ids)))[0]
619             SetPersonPrimarySite(person_id, primary_site_id)
620
621             if self.check:
622                 person = GetPersons([person_id])[0]
623                 assert person['site_ids'][0] == primary_site_id
624
625         if self.verbose:
626             print "Added users", self.person_ids
627
628     def DeletePersons(self):
629         # Delete users
630         for person_id in self.person_ids:
631             # Remove from each site
632             for site_id in self.site_ids:
633                 DeletePersonFromSite(person_id, site_id)
634
635             if self.check:
636                 person = GetPersons([person_id])[0]
637                 assert not person['site_ids']
638
639             # Revoke roles
640             person = GetPersons([person_id])[0]
641             for role_id in person['role_ids']:
642                 DeleteRoleFromPerson(role_id, person_id)
643
644             if self.check:
645                 person = GetPersons([person_id])[0]
646                 assert not person['role_ids']
647
648             # Disable account
649             UpdatePerson(person_id, {'enabled': False})
650
651             if self.check:
652                 person = GetPersons([person_id])[0]
653                 assert not person['enabled']
654
655             # Delete account
656             DeletePerson(person_id)
657
658             if self.check:
659                 assert not GetPersons([person_id])                         
660
661         if self.check:
662             assert not GetPersons(self.person_ids)
663
664         if self.verbose:
665             print "Deleted users", self.person_ids
666
667         self.person_ids = []
668
669 if __name__ == "__main__":
670     parser = OptionParser()
671     parser.add_option("-c", "--check", action = "store_true", default = False, help = "Verify actions (default: %default)")
672     parser.add_option("-q", "--quiet", action = "store_true", default = False, help = "Be quiet (default: %default)")
673     parser.add_option("-p", "--populate", action = "store_true", default = False, help = "Do not cleanup (default: %default)")
674     (options, args) = parser.parse_args()
675     test = Test(check = options.check, verbose = not options.quiet)
676     test.run()
677     if not options.populate:
678         test.cleanup()