user permission REST test, WIP
[plstackapi.git] / planetstack / tests / useraccesstest.py
1 """ useraccesstest.py
2
3     This is a basic REST API permission test. Call it with a username and a
4     password, and it will try to read and update some user and slice object,
5     and report if something is broken.
6
7     This is not an exhaustive test.
8 """
9
10
11 import inspect
12 import json
13 import os
14 import requests
15 import sys
16
17 from operator import itemgetter, attrgetter
18
19 REST_API="http://node43.princeton.vicci.org:8000/plstackapi/"
20 USERS_API = REST_API + "users/"
21 SLICES_API = REST_API + "slices/"
22 SITES_API = REST_API + "sites/"
23 SITEPRIV_API = REST_API + "site_privileges/"
24 SLICEPRIV_API = REST_API + "slice_memberships/"
25 SITEROLE_API = REST_API + "site_roles/"
26 SLICEROLE_API = REST_API + "slice_roles/"
27
28 TEST_USER_EMAIL = "test1234@test.com"
29
30 username = sys.argv[1]
31 password = sys.argv[2]
32
33 opencloud_auth=(username, password)
34 admin_auth=("scott@onlab.us", "letmein")   # admin creds, used to get full set of objects
35
36 def fail_unless(x, msg):
37     if not x:
38         (frame, filename, line_number, function_name, lines, index) = inspect.getouterframes(inspect.currentframe())[1]
39         print "FAIL (%s:%d)" % (function_name, line_number), msg
40
41
42 print "downloading objects using admin"
43 r = requests.get(USERS_API + "?no_hyperlinks=1", auth=admin_auth)
44 allUsers = r.json()
45 r = requests.get(SLICES_API + "?no_hyperlinks=1", auth=admin_auth)
46 allSlices = r.json()
47 r = requests.get(SITES_API + "?no_hyperlinks=1", auth=admin_auth)
48 allSites = r.json()
49 r = requests.get(SITEPRIV_API + "?no_hyperlinks=1", auth=admin_auth)
50 allSitePriv = r.json()
51 r = requests.get(SLICEPRIV_API + "?no_hyperlinks=1", auth=admin_auth)
52 allSlicePriv = r.json()
53 r = requests.get(SITEROLE_API + "?no_hyperlinks=1", auth=admin_auth)
54 allSiteRole = r.json()
55 r = requests.get(SLICEROLE_API + "?no_hyperlinks=1", auth=admin_auth)
56 allSliceRole = r.json()
57
58 def should_see_user(myself, otherUser):
59     if myself["is_admin"]:
60         return True
61     if myself["id"] == otherUser["id"]:
62         return True
63     for sitePriv in allSitePriv:
64         if (sitePriv["user"] == myself["id"]) and (sitePriv["site"] == otherUser["site"]):
65             for role in allSiteRole:
66                 if role["role"]=="pi" and role["id"] == sitePriv["role"]:
67                     return True
68     return False
69
70 def should_see_slice(myself, slice):
71     if myself["is_admin"]:
72         return True
73     for sitePriv in allSitePriv:
74         if (sitePriv["user"] == myself["id"]) and (sitePriv["site"] == slice["site"]):
75             for role in allSiteRole:
76                 if role["role"]=="pi" and role["id"] == sitePriv["role"]:
77                     return True
78     for slicePriv in allSlicePriv:
79         if (slicePriv["user"] == myself["id"]) and (sitePriv["slice"] == slice["id"]):
80             for role in allSliceRole:
81                 if role["role"]=="pi" and role["id"] == slicePriv["role"]:
82                     return True
83     return False
84
85 def flip_phone(user):
86     if user["phone"] == "123":
87         user["phone"] = "456"
88     else:
89         user["phone"] = "123"
90
91 def flip_desc(slice):
92     if slice["description"] == "some_description":
93         slice["description"] = "some_other_description"
94     else:
95         slice["description"] = "some_description"
96
97 def delete_user_if_exists(email):
98     r = requests.get(USERS_API +"?email=%s" % email, auth=admin_auth)
99     if r.status_code==200:
100         user = r.json()
101         if len(user)>0:
102             user=user[0]
103             r = requests.delete(USERS_API + str(user["id"]) + "/", auth=admin_auth)
104             fail_unless(r.status_code==200, "failed to delete the test user")
105
106 print "  loaded user:%d slice:%d, site:%d, site_priv:%d slice_priv:%d" % (len(allUsers), len(allSlices), len(allSites), len(allSitePriv), len(allSlicePriv))
107
108 # get our own user record
109
110 r = requests.get(USERS_API + "?email=%s&no_hyperlinks" % username, auth=opencloud_auth)
111 fail_unless(r.status_code==200, "failed to get user %s" % username)
112 myself = r.json()
113 fail_unless(len(myself)==1, "wrong number of results when getting user %s" % username)
114 myself = myself[0]
115
116 # check to see that we see the users we should be able to
117
118 r = requests.get(USERS_API, auth=opencloud_auth)
119 myUsers = r.json()
120 for user in myUsers:
121     fail_unless(should_see_user(myself, user), "saw user %s but we shouldn't have" % user["email"])
122 myUsersIds = [r["id"] for r in myUsers]
123 for user in allUsers:
124     if should_see_user(myself, user):
125         fail_unless(user["id"] in myUsersIds, "should have seen user %s but didnt" % user["email"])
126
127 # toggle the phone number on the users we should be able to
128
129 """
130 for user in allUsers:
131     user = requests.get(USERS_API + str(user["id"]) + "/", auth=admin_auth).json()
132     flip_phone(user)
133     r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth)
134     if should_see_user(myself, user):
135         fail_unless(r.status_code==200, "failed to change phone number on %s" % user["email"])
136     else:
137         # XXX: this is failing, but for the wrong reason
138         fail_unless(r.status_code!=200, "was able to change phone number on %s but shouldn't have" % user["email"])
139
140 # try changing is_staff. We should be able to do it if we're an admin, but not
141 # otherwise.
142
143 for user in allUsers:
144     user = requests.get(USERS_API + str(user["id"]) + "/", auth=admin_auth).json()
145     user["is_staff"] = not user["is_staff"]
146     r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth)
147     if myself["is_admin"]:
148         fail_unless(r.status_code==200, "failed to change is_staff on %s" % user["email"])
149     else:
150         # XXX: this is failing, but for the wrong reason
151         fail_unless(r.status_code!=200, "was able to change is_staff on %s but shouldn't have" % user["email"])
152
153     # put it back to false, in case we successfully changed it...
154     user["is_staff"] = False
155     r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth)
156 """
157
158 # delete the TEST_USER_EMAIL if it exists
159 delete_user_if_exists(TEST_USER_EMAIL)
160
161 newUser = {"firstname": "test", "lastname": "1234", "email": TEST_USER_EMAIL, "password": "letmein"}
162 r = requests.post(USERS_API, data=newUser, auth=opencloud_auth)
163 if myself["is_admin"]:
164     fail_unless(r.status_code==200, "failed to create %s" % TEST_USER_EMAIL)
165 else:
166     fail_unless(r.status_code!=200, "created %s but we shouldn't have been able to" % TEST_USER_EMAIL)
167
168 delete_user_if_exists(TEST_USER_EMAIL)
169
170 sys.exit(-1)
171
172
173 # now create it as admin
174 r = requests.post(USERS_API, data=newUser, auth=admin_auth)
175 fail_unless(r.status_code==201, "failed to create %s as admin" % TEST_USER_EMAIL)
176
177 user = requests.get(USERS_API +"?email=%s" % TEST_USER_EMAIL, auth=admin_auth).json()[0]
178 r = requests.delete(USERS_API + str(user["id"]) + "/", auth=opencloud_auth)
179 if myself["is_admin"]:
180     fail_unless(r.status_code==200, "failed to delete %s" % TEST_USER_EMAIL)
181 else:
182     fail_unless(r.status_code!=200, "deleted %s but we shouldn't have been able to" % TEST_USER_EMAIL)
183
184 # slice tests
185
186 r = requests.get(SLICES_API, auth=opencloud_auth)
187 mySlices = r.json()
188
189 for slice in mySlices:
190     fail_unless(should_see_slice(myself, slice), "saw slice %s but we shouldn't have" % slice["name"])
191 mySlicesIds = [r["id"] for r in mySlices]
192 for slice in allSlices:
193     if should_see_slice(myself, slice):
194         fail_unless(slice["id"] in mySliceIds, "should have seen slice %s but didnt" % slice["name"])
195
196 for slice in allSlices:
197     slice = requests.get(SLICES_API + str(slice["id"]) + "/", auth=admin_auth).json()
198     flip_desc(slice)
199     r = requests.put(SLICES_API + str(slice["id"]) +"/", data=slice, auth=opencloud_auth)
200     if should_see_slice(myself, slice):
201         fail_unless(r.status_code==200, "failed to change desc on %s" % slice["name"])
202     else:
203         fail_unless(r.status_code!=200, "was able to change desc on %s but shouldn't have" % slice["name"])
204
205 print "Done."