7bc181d9b929a679a5d0e45d14b9e7bda4c1b8ed
[tests.git] / qaapi / qa / tests / api_unit_test.py
1 #!/usr/bin/python
2 #
3 # Test script example
4 #
5 # Copyright (C) 2006 The Trustees of Princeton University
6 #
7 #
8
9 from pprint import pprint
10 from string import letters, digits, punctuation
11 from traceback import print_exc
12 import base64
13 import os, sys
14 import socket
15 import xmlrpclib
16 import time
17
18 from Test import Test
19 from qa import utils
20 from qa.Config import Config
21 from qa.logger import Logfile, log
22 from random import Random
23
24 random = Random()
25
26 config = Config()
27 auth = config.auth
28
29 try: boot_states = config.api.GetBootStates(auth)
30 except: boot_states = [u'boot', u'dbg', u'inst', u'new', u'rcnf', u'rins']
31
32 try: roles = [role['name'] for role in config.api.GetRoles(auth)]
33 except: roles = [u'admin', u'pi', u'user', u'tech']
34
35 try: methods = config.api.GetNetworkMethods(auth)
36 except: methods = [u'static', u'dhcp', u'proxy', u'tap', u'ipmi', u'unknown']
37
38 try:types = config.api.GetNetworkTypes(auth)
39 except: types = [u'ipv4']
40
41 def randfloat(min = 0.0, max = 1.0):
42     return float(min) + (random.random() * (float(max) - float(min)))
43
44 def randint(min = 0, max = 1):
45     return int(randfloat(min, max + 1))
46
47 # See "2.2 Characters" in the XML specification:
48 #
49 # #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD]
50 # avoiding
51 # [#x7F-#x84], [#x86-#x9F], [#xFDD0-#xFDDF]
52
53 ascii_xml_chars = map(unichr, [0x9, 0xA, 0xD])
54 ascii_xml_chars += map(unichr, xrange(0x20, 0x7F - 1))
55 low_xml_chars = list(ascii_xml_chars)
56 low_xml_chars += map(unichr, xrange(0x84 + 1, 0x86 - 1))
57 low_xml_chars += map(unichr, xrange(0x9F + 1, 0xFF))
58 valid_xml_chars = list(low_xml_chars)
59 valid_xml_chars += map(unichr, xrange(0xFF + 1, 0xD7FF))
60 valid_xml_chars += map(unichr, xrange(0xE000, 0xFDD0 - 1))
61 valid_xml_chars += map(unichr, xrange(0xFDDF + 1, 0xFFFD))
62
63 def randstr(length, pool = valid_xml_chars, encoding = "utf-8"):
64     sample = random.sample(pool, min(length, len(pool)))
65     while True:
66         s = u''.join(sample)
67         bytes = len(s.encode(encoding))
68         if bytes > length:
69             sample.pop()
70         elif bytes < length:
71             sample += random.sample(pool, min(length - bytes, len(pool)))
72             random.shuffle(sample)
73         else:
74             break
75     return s
76
77 def randhostname():
78     # 1. Each part begins and ends with a letter or number.
79     # 2. Each part except the last can contain letters, numbers, or hyphens.
80     # 3. Each part is between 1 and 64 characters, including the trailing dot.
81     # 4. At least two parts.
82     # 5. Last part can only contain between 2 and 6 letters.
83     hostname = 'a' + randstr(61, letters + digits + '-') + '1.' + \
84                'b' + randstr(61, letters + digits + '-') + '2.' + \
85                'c' + randstr(5, letters)
86     return hostname
87
88 def randpath(length):
89     parts = []
90     for i in range(randint(1, 10)):
91         parts.append(randstr(randint(1, 30), ascii_xml_chars))
92     return os.sep.join(parts)[0:length]
93
94 def randemail():
95     return (randstr(100, letters + digits) + "@" + randhostname()).lower()
96
97 def randkey(bits = 2048):
98     key_types = ["ssh-dss", "ssh-rsa"]
99     key_type = random.sample(key_types, 1)[0]
100     return ' '.join([key_type,
101                      base64.b64encode(''.join(randstr(bits / 8).encode("utf-8"))),
102                      randemail()])
103
104 def random_site():
105     return {
106         'name': randstr(254),
107         'abbreviated_name': randstr(50),
108         'login_base': randstr(20, letters).lower(),
109         'latitude': int(randfloat(-90.0, 90.0) * 1000) / 1000.0,
110         'longitude': int(randfloat(-180.0, 180.0) * 1000) / 1000.0,
111         }
112             
113 def random_address_type():
114     return {
115         'name': randstr(20),
116         'description': randstr(254),
117         }
118
119 def random_address():
120     return {
121         'line1': randstr(254),
122         'line2': randstr(254),
123         'line3': randstr(254),
124         'city': randstr(254),
125         'state': randstr(254),
126         'postalcode': randstr(64),
127         'country': randstr(128),
128         }
129
130 def random_person():
131     return {
132         'first_name': randstr(128),
133         'last_name': randstr(128),
134         'email': randemail(),
135         'bio': randstr(254),
136         # Accounts are disabled by default
137         'enabled': False,
138         'password': randstr(254),
139         }
140
141 def random_key():
142     return {
143         'key_type': random.sample(key_types, 1)[0],
144         'key': randkey()
145         }
146
147 def random_slice():
148     return {
149         'name': site['login_base'] + "_" + randstr(11, letters).lower(),
150         'url': "http://" + randhostname() + "/",
151         'description': randstr(2048),
152         }
153
154 def random_nodegroup():
155     return {
156         'name': randstr(50),
157         'description': randstr(200),
158         }
159
160 def random_node():
161    return {
162        'hostname': randhostname(),
163        'boot_state': random.sample(boot_states, 1)[0],
164        'model': randstr(255),
165        'version': randstr(64),
166        }
167
168 def random_nodenetwork():
169     nodenetwork_fields = {
170         'method': random.sample(methods, 1)[0],
171         'type': random.sample(types, 1)[0],
172         'bwlimit': randint(500000, 10000000),
173         }
174
175     if method != 'dhcp':
176         ip = randint(0, 0xffffffff)
177         netmask = (0xffffffff << randint(2, 31)) & 0xffffffff
178         network = ip & netmask
179         broadcast = ((ip & netmask) | ~netmask) & 0xffffffff
180         gateway = randint(network + 1, broadcast - 1)
181         dns1 = randint(0, 0xffffffff)
182
183         for field in 'ip', 'netmask', 'network', 'broadcast', 'gateway', 'dns1':
184             nodenetwork_fields[field] = socket.inet_ntoa(struct.pack('>L', locals()[field]))
185
186     return nodenetwork_fields
187
188 def random_pcu():
189     return {
190         'hostname': randhostname(),
191         'ip': socket.inet_ntoa(struct.pack('>L', randint(0, 0xffffffff))),
192         'protocol': randstr(16),
193         'username': randstr(254),
194         'password': randstr(254),
195         'notes': randstr(254),
196         'model': randstr(32),
197         }
198
199 def random_conf_file():
200     return {
201         'enabled': bool(randint()),
202         'source': randpath(255),
203         'dest': randpath(255),
204         'file_permissions': "%#o" % randint(0, 512),
205         'file_owner': randstr(32, letters + '_' + digits),
206         'file_group': randstr(32, letters + '_' + digits),
207         'preinstall_cmd': randpath(100),
208         'postinstall_cmd': randpath(100),
209         'error_cmd': randpath(100),
210         'ignore_cmd_errors': bool(randint()),
211         'always_update': bool(randint()),
212         }
213
214 def random_attribute_type():
215     return {
216         'name': randstr(100),
217         'description': randstr(254),
218         'min_role_id': random.sample(roles.values(), 1)[0],
219         }
220
221 def isequal(object_fields, expected_fields):
222     try:        
223         for field in expected_fields:
224             assert field in object_fields
225             assert object_fields[field] == expected_fields[field]
226     except:
227         return False
228     return True
229
230 def islistequal(list1, list2):
231     try: 
232         assert set(list1) == set(list2) 
233     except:
234         return False
235     return True
236         
237 def isunique(id, id_list):
238     try:
239         assert id not in id_list
240     except:
241         return False
242     return True
243         
244 class api_unit_test(Test):
245     
246     def call(self,
247              sites = 2,
248              nodes = 4,
249              address_types = 3,
250              addresses = 2,
251              persons = 10, 
252              keys = 3
253             ):
254         self.api = self.config.api
255         self.auth = self.config.auth
256         self.all_methods = set(self.api.system.listMethods()) 
257         self.methods_tested = set()
258         self.methods_failed = set()
259
260         try:
261             try:
262                 self.site_ids = self.Sites(sites)
263                 self.node_ids = self.Nodes(nodes)
264             except:
265                 print_exc()
266         finally:
267             try:
268                 self.cleanup()
269             finally: 
270                  
271                 logfile = Logfile("api-unittest.log")
272                 methods_ok = list(self.methods_tested.difference(self.methods_failed))
273                 methods_failed = list(self.methods_failed)
274                 methods_untested = list(self.all_methods.difference(self.methods_tested))
275                 methods_ok.sort()
276                 methods_failed.sort()
277                 methods_untested.sort()
278                 print >> logfile, "\n".join([m+": [OK]" for m in  methods_ok])
279                 print >> logfile, "\n".join([m+": [FAILED]" for m in methods_failed])
280                 print >> logfile, "\n".join([m+": [Not Tested]" for m in  methods_untested])
281  
282     def isequal(self, object_fields, expected_fields, method_name):
283         try:
284             for field in expected_fields:
285                 assert field in object_fields
286                 assert object_fields[field] == expected_fields[field]
287         except:
288             self.methods_failed.update([method_name])    
289             return False
290         return True
291
292     def islistequal(self, list1, list2, method_name):
293         try: assert set(list1) == set(list2)
294         except:
295             self.methods_failed.update([method_name]) 
296             return False
297         return True
298
299     def isunique(self, id, id_list, method_name):
300         try: assert id not in id_list
301         except:
302             self.methods_failed.update([method_name])    
303             return False
304         return True
305
306     def debug(self, method, method_name=None):
307         if method_name is None:
308              method_name = method._Method__name
309
310         self.methods_tested.update([method_name])
311         def wrapper(*args, **kwds):
312             try:
313                 return method(*args, **kwds)
314             except:
315                 self.methods_failed.update([method_name])
316                 return None
317
318         return wrapper
319  
320     def cleanup(self):
321         #if self.person_ids: self.DeletePersons()
322         #if self.address_ids: self.DeleteAddresses()
323         #if self.address_type_ids: self.DeleteAddressTypes()
324         if hasattr(self, 'node_ids'): self.DeleteNodes()
325         if hasattr(self, 'site_ids'): self.DeleteSites()
326         
327
328     def Sites(self, n=4):
329         site_ids = []
330         for i in range(n):
331             # Add Site
332             site_fields = random_site()
333             AddSite = self.debug(self.api.AddSite) 
334             site_id = AddSite(self.auth, site_fields)
335             if site_id is None: continue
336
337             # Should return a unique id
338             self.isunique(site_id, site_ids, 'AddSite - isunique')
339             site_ids.append(site_id)
340             GetSites = self.debug(self.api.GetSites)
341             sites = GetSites(self.auth, [site_id])
342             if sites is None: continue
343             site = sites[0]
344             self.isequal(site, site_fields, 'AddSite - isequal')
345         
346             # Update site
347             site_fields = random_site()
348             UpdateSite = self.debug(self.api.UpdateSite)
349             result = UpdateSite(self.auth, site_id, site_fields)
350
351             # Check again
352             sites = GetSites(self.auth, [site_id])
353             if sites is None: continue
354             site = sites[0]      
355             self.isequal(site, site_fields, 'UpdateSite - isequal')
356             
357         sites = GetSites(self.auth, site_ids)
358         if sites is not None: 
359             self.islistequal(site_ids, [site['site_id'] for site in sites], 'GetSites - isequal')
360         
361         if self.config.verbose:
362             utils.header("Added sites: %s" % site_ids)          
363
364         return site_ids
365
366
367     def DeleteSites(self):
368         # Delete all sites
369         DeleteSite = self.debug(self.api.DeleteSite)
370         for site_id in self.site_ids:
371             result = DeleteSite(self.auth, site_id)
372
373         # Check if sites are deleted
374         GetSites = self.debug(self.api.GetSites)
375         sites = GetSites(self.auth, self.site_ids) 
376         self.islistequal(sites, [], 'DeleteSite - check')       
377
378         if self.config.verbose:
379             utils.header("Deleted sites: %s" % self.site_ids)
380
381         self.site_ids = []               
382
383     def Nodes(self, n=4):
384         node_ids = []
385         for i in range(n):
386             # Add Node
387             node_fields = random_node()
388             site_id = random.sample(self.site_ids, 1)[0]
389             AddNode = self.debug(self.api.AddNode)
390             node_id = AddNode(self.auth, site_id, node_fields)
391             if node_id is None: continue
392             
393             # Should return a unique id
394             self.isunique(node_id, node_ids, 'AddNode - isunique')
395             node_ids.append(node_id)
396
397             # Check nodes
398             GetNodes = self.debug(self.api.GetNodes)
399             nodes = GetNodes(self.auth, [node_id])
400             if nodes is None: continue
401             node = nodes[0]
402             self.isequal(node, node_fields, 'AddNode - isequal')
403         
404             # Update node
405             node_fields = random_node()
406             UpdateNode = self.debug(self.api.UpdateNode)
407             result = UpdateNode(self.auth, node_id, node_fields)
408             
409             # Check again
410             nodes = GetNodes(self.auth, [node_id])
411             if nodes is None: continue
412             node = nodes[0]
413             self.isequal(node, node_fields, 'UpdateNode - isequal')
414         
415         nodes = GetNodes(self.auth, node_ids)
416         if nodes is not None:
417             self.islistequal(node_ids, [node['node_id'] for node in nodes], 'GetNodes - isequal')
418
419         if self.config.verbose:
420             utils.header("Added nodes: %s" % node_ids)
421         
422         return node_ids
423
424     def DeleteNodes(self):
425         DeleteNode = self.debug(self.api.DeleteNode)
426         for node_id in self.node_ids:
427             result = DeleteNode(self.auth, node_id)
428
429         # Check if nodes are deleted
430         GetNodes = self.debug(self.api.GetNodes)
431         nodes = GetNodes(self.api, self.node_ids)
432         self.islistequal(nodes, [], 'DeleteNode Check')
433
434         if self.config.verbose:
435             utils.header("Deleted nodes: %s" % self.node_ids)
436         
437         self.node_ids = []
438                                  
439     def AddressTypes(self, n = 3):
440         address_type_ids = []
441         for i in range(n):
442             address_type_fields = random_address_type()
443             AddAddressType = self.debug(self.api.AddAddressType)
444             address_type_id = AddAddressType(self.auth, address_type_fields)
445             if address_type_id is None: continue
446
447             # Should return a unique address_type_id
448             self.isunique(address_type_id, address_type_ids, 'AddAddressType - isunique') 
449             address_type_ids.append(address_type_id)
450
451             # Check address type
452             GetAddressTypes = self.debug(self.api.GetAddressTypes)
453             address_types = GetAddressTypes(self.auth, [address_type_id])
454             if address_types is None: continue
455             address_type = address_types[0]
456             self.isequal(address_type, address_type_fields, 'AddAddressType - isequal')
457
458             # Update address type
459             address_type_fields = random_address_type()
460             UpdateAddressType = self.debug(self.api.UpdateAddressType)
461             result = UpdateAddressType(self.auth, address_type_id, address_type_fields)
462             if result is None: continue
463             
464             # Check address type again
465             address_types = GetAddressTypes(self.auth, [address_type_id])
466             if address_types is None: continue
467             address_type = address_types[0]     
468             self.isequal(address_type, address_type_fields, 'UpdateAddressType - isequal')      
469
470         # Check get all address types
471         address_types = GetAddressTypes(self.auth, address_type_ids)
472         if address_types is not None:
473             self.islistequal(address_type_ids, [address_type['address_type_id'] for address_type in address_types], 'GetAddressTypes - isequal')
474
475         if self.config.verbose:
476             print "Added address types", address_type_ids
477
478         return address_type_ids
479
480     def DeleteAddressTypes(self):
481
482         DeleteAddressType = self.debug(self.api.DeleteAddressType)
483         for address_type_id in self.address_type_ids:
484             DeleteAddressType(auth, address_type_id)
485
486         GetAddressTypes = self.debug(self.api.GetAddressTypes)
487         address_types = GetAddressTypes(self.auth, self.address_type_ids)
488         self.islistequal(address_types, [], 'DeleteAddressType - check')
489
490         if self.config.verbose:
491             utils.header("Deleted address types: " % self.address_type_ids)
492
493         self.address_type_ids = []
494
495     def Addresses(self, n = 3):
496         address_ids = []
497         for i in range(n):
498             address_fields = random_address()
499             site_id = random.sample(self.site_ids, 1)[0]        
500             AddSiteAddress = self.debug(self.api.AddSiteAddress)
501             address_id = AddSiteAddress(self.auth, site_id, address_fields)
502             if address_id is None: continue 
503         
504             # Should return a unique address_id
505             self.isunique(address_id, address_ids, 'AddSiteAddress - isunique')
506             address_ids.append(address_id)
507
508             # Check address
509             GetAddresses = self.debug(self.api.GetAddresses)  
510             addresses = GetAddresses(self.auth, [address_id])
511             if addresses is None: continue
512             address = addresses[0]
513             self.isequal(address, address_fields, 'AddSiteAddress - isequal')
514             
515             # Update address
516             address_fields = random_address()
517             UpdateAddress = self.debug(self.api.UpdateAddress)
518             result = UpdateAddress(self.auth, address_id, address_fields)
519                 
520             # Check again
521             addresses = GetAddresses(self.auth, [address_id])
522             if addresses is None: continue
523             address = addresses[0]
524             self.isequal(address, address_fields, 'UpdateAddress - isequal')
525                
526         addresses = GetAddress(self.auth, address_ids)
527         if addresses is not None:  
528             slef.islistequal(address_ids, [ad['address_id'] for ad in addresses], 'GetAddresses - isequal')     
529         
530         if self.config.verbose:
531             utils.header("Added addresses: %s" % self.address_ids)
532
533         return address_ids
534
535     def DeleteAddresses(self):
536
537         DeleteAddress = self.debug(self.api.DeleteAddress)
538         # Delete site addresses
539         for address_id in self.address_ids:
540             result = DeleteAddress(self.auth, address_id)
541         
542         # Check 
543         GetAddresses = self.debug(self.api.GetAddresses)
544         addresses = GetAddresses(self.api, self.address_ids)
545         self.islistequal(addresses, [], 'DeleteAddress - check')
546         if self.verbose:
547             print "Deleted addresses", self.address_ids
548
549         self.address_ids = []
550
551     def AddPersons(self, n = 3):
552
553         roles = GetRoles()
554         role_ids = [role['role_id'] for role in roles]
555         roles = [role['name'] for role in roles]
556         roles = dict(zip(roles, role_ids))
557
558         for i in range(n):
559
560             # Add account
561             person_fields = random_person()
562             person_id = AddPerson(person_fields)
563
564             # Should return a unique person_id
565             assert person_id not in self.person_ids
566             self.person_ids.append(person_id)
567
568             if self.check:
569                 # Check account
570                 person = GetPersons([person_id])[0]
571                 for field in person_fields:
572                     if field != 'password':
573                         assert person[field] == person_fields[field]
574
575                 # Update account
576                 person_fields = random_person()
577                 UpdatePerson(person_id, person_fields)
578
579                 # Check account again
580                 person = GetPersons([person_id])[0]
581                 for field in person_fields:
582                     if field != 'password':
583                         assert person[field] == person_fields[field]
584
585             auth = {'AuthMethod': "password",
586                     'Username': person_fields['email'],
587                     'AuthString': person_fields['password']}
588
589             if self.check:
590                 # Check that account is disabled
591                 try:
592                     assert not AuthCheck(auth)
593                 except:
594                     pass
595
596             # Add random set of roles
597             person_roles = random.sample(['user', 'pi', 'tech'], randint(1, 3))
598             for person_role in person_roles:
599                 role_id = roles[person_role]
600                 AddRoleToPerson(role_id, person_id)
601
602             if self.check:
603                 person = GetPersons([person_id])[0]
604                 assert set(person_roles) == set(person['roles'])
605
606             # Enable account
607             UpdatePerson(person_id, {'enabled': True})
608
609             if self.check:
610                 # Check that account is enabled
611                 assert AuthCheck(auth)
612
613             # Associate account with random set of sites
614             person_site_ids = []
615             for site_id in random.sample(self.site_ids, randint(1, len(self.site_ids))):
616                 AddPersonToSite(person_id, site_id)
617                 person_site_ids.append(site_id)
618
619             if self.check:
620                 # Make sure it really did it
621                 person = GetPersons([person_id])[0]
622                 assert set(person_site_ids) == set(person['site_ids'])
623
624             # Set a primary site
625             primary_site_id = random.sample(person_site_ids, randint(1, len(person_site_ids)))[0]
626             SetPersonPrimarySite(person_id, primary_site_id)
627
628             if self.check:
629                 person = GetPersons([person_id])[0]
630                 assert person['site_ids'][0] == primary_site_id
631
632         if self.verbose:
633             print "Added users", self.person_ids
634
635     def DeletePersons(self):
636         # Delete users
637         for person_id in self.person_ids:
638             # Remove from each site
639             for site_id in self.site_ids:
640                 DeletePersonFromSite(person_id, site_id)
641
642             if self.check:
643                 person = GetPersons([person_id])[0]
644                 assert not person['site_ids']
645
646             # Revoke roles
647             person = GetPersons([person_id])[0]
648             for role_id in person['role_ids']:
649                 DeleteRoleFromPerson(role_id, person_id)
650
651             if self.check:
652                 person = GetPersons([person_id])[0]
653                 assert not person['role_ids']
654
655             # Disable account
656             UpdatePerson(person_id, {'enabled': False})
657
658             if self.check:
659                 person = GetPersons([person_id])[0]
660                 assert not person['enabled']
661
662             # Delete account
663             DeletePerson(person_id)
664
665             if self.check:
666                 assert not GetPersons([person_id])                         
667
668         if self.check:
669             assert not GetPersons(self.person_ids)
670
671         if self.verbose:
672             print "Deleted users", self.person_ids
673
674         self.person_ids = []
675
676
677 if __name__ == '__main__':
678     args = tuple(sys.argv[1:])
679     api_unit_test()(*args)