3 This is a basic REST API permission test. Call it with a username and a
4 password, and it will try to read and update some user and slice object,
5 and report if something is broken.
7 This is not an exhaustive test.
17 from operator import itemgetter, attrgetter
19 REST_API="http://node43.princeton.vicci.org:8000/plstackapi/"
20 USERS_API = REST_API + "users/"
21 SLICES_API = REST_API + "slices/"
22 SITES_API = REST_API + "sites/"
23 SITEPRIV_API = REST_API + "site_privileges/"
24 SLICEPRIV_API = REST_API + "slice_memberships/"
25 SITEROLE_API = REST_API + "site_roles/"
26 SLICEROLE_API = REST_API + "slice_roles/"
28 TEST_USER_EMAIL = "test1234@test.com"
30 username = sys.argv[1]
31 password = sys.argv[2]
33 opencloud_auth=(username, password)
34 admin_auth=("scott@onlab.us", "letmein") # admin creds, used to get full set of objects
36 def fail_unless(x, msg):
38 (frame, filename, line_number, function_name, lines, index) = inspect.getouterframes(inspect.currentframe())[1]
39 print "FAIL (%s:%d)" % (function_name, line_number), msg
42 print "downloading objects using admin"
43 r = requests.get(USERS_API + "?no_hyperlinks=1", auth=admin_auth)
45 r = requests.get(SLICES_API + "?no_hyperlinks=1", auth=admin_auth)
47 r = requests.get(SITES_API + "?no_hyperlinks=1", auth=admin_auth)
49 r = requests.get(SITEPRIV_API + "?no_hyperlinks=1", auth=admin_auth)
50 allSitePriv = r.json()
51 r = requests.get(SLICEPRIV_API + "?no_hyperlinks=1", auth=admin_auth)
52 allSlicePriv = r.json()
53 r = requests.get(SITEROLE_API + "?no_hyperlinks=1", auth=admin_auth)
54 allSiteRole = r.json()
55 r = requests.get(SLICEROLE_API + "?no_hyperlinks=1", auth=admin_auth)
56 allSliceRole = r.json()
58 def should_see_user(myself, otherUser):
59 if myself["is_admin"]:
61 if myself["id"] == otherUser["id"]:
63 for sitePriv in allSitePriv:
64 if (sitePriv["user"] == myself["id"]) and (sitePriv["site"] == otherUser["site"]):
65 for role in allSiteRole:
66 if role["role"]=="pi" and role["id"] == sitePriv["role"]:
70 def should_see_slice(myself, slice):
71 if myself["is_admin"]:
73 for sitePriv in allSitePriv:
74 if (sitePriv["user"] == myself["id"]) and (sitePriv["site"] == slice["site"]):
75 for role in allSiteRole:
76 if role["role"]=="pi" and role["id"] == sitePriv["role"]:
78 for slicePriv in allSlicePriv:
79 if (slicePriv["user"] == myself["id"]) and (sitePriv["slice"] == slice["id"]):
80 for role in allSliceRole:
81 if role["role"]=="pi" and role["id"] == slicePriv["role"]:
86 if user["phone"] == "123":
92 if slice["description"] == "some_description":
93 slice["description"] = "some_other_description"
95 slice["description"] = "some_description"
97 def delete_user_if_exists(email):
98 r = requests.get(USERS_API +"?email=%s" % email, auth=admin_auth)
99 if r.status_code==200:
103 r = requests.delete(USERS_API + str(user["id"]) + "/", auth=admin_auth)
104 fail_unless(r.status_code==200, "failed to delete the test user")
106 print " loaded user:%d slice:%d, site:%d, site_priv:%d slice_priv:%d" % (len(allUsers), len(allSlices), len(allSites), len(allSitePriv), len(allSlicePriv))
108 # get our own user record
110 r = requests.get(USERS_API + "?email=%s&no_hyperlinks" % username, auth=opencloud_auth)
111 fail_unless(r.status_code==200, "failed to get user %s" % username)
113 fail_unless(len(myself)==1, "wrong number of results when getting user %s" % username)
116 # check to see that we see the users we should be able to
118 r = requests.get(USERS_API, auth=opencloud_auth)
121 fail_unless(should_see_user(myself, user), "saw user %s but we shouldn't have" % user["email"])
122 myUsersIds = [r["id"] for r in myUsers]
123 for user in allUsers:
124 if should_see_user(myself, user):
125 fail_unless(user["id"] in myUsersIds, "should have seen user %s but didnt" % user["email"])
127 # toggle the phone number on the users we should be able to
130 for user in allUsers:
131 user = requests.get(USERS_API + str(user["id"]) + "/", auth=admin_auth).json()
133 r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth)
134 if should_see_user(myself, user):
135 fail_unless(r.status_code==200, "failed to change phone number on %s" % user["email"])
137 # XXX: this is failing, but for the wrong reason
138 fail_unless(r.status_code!=200, "was able to change phone number on %s but shouldn't have" % user["email"])
140 # try changing is_staff. We should be able to do it if we're an admin, but not
143 for user in allUsers:
144 user = requests.get(USERS_API + str(user["id"]) + "/", auth=admin_auth).json()
145 user["is_staff"] = not user["is_staff"]
146 r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth)
147 if myself["is_admin"]:
148 fail_unless(r.status_code==200, "failed to change is_staff on %s" % user["email"])
150 # XXX: this is failing, but for the wrong reason
151 fail_unless(r.status_code!=200, "was able to change is_staff on %s but shouldn't have" % user["email"])
153 # put it back to false, in case we successfully changed it...
154 user["is_staff"] = False
155 r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth)
158 # delete the TEST_USER_EMAIL if it exists
159 delete_user_if_exists(TEST_USER_EMAIL)
161 newUser = {"firstname": "test", "lastname": "1234", "email": TEST_USER_EMAIL, "password": "letmein"}
162 r = requests.post(USERS_API, data=newUser, auth=opencloud_auth)
163 if myself["is_admin"]:
164 fail_unless(r.status_code==200, "failed to create %s" % TEST_USER_EMAIL)
166 fail_unless(r.status_code!=200, "created %s but we shouldn't have been able to" % TEST_USER_EMAIL)
168 delete_user_if_exists(TEST_USER_EMAIL)
173 # now create it as admin
174 r = requests.post(USERS_API, data=newUser, auth=admin_auth)
175 fail_unless(r.status_code==201, "failed to create %s as admin" % TEST_USER_EMAIL)
177 user = requests.get(USERS_API +"?email=%s" % TEST_USER_EMAIL, auth=admin_auth).json()[0]
178 r = requests.delete(USERS_API + str(user["id"]) + "/", auth=opencloud_auth)
179 if myself["is_admin"]:
180 fail_unless(r.status_code==200, "failed to delete %s" % TEST_USER_EMAIL)
182 fail_unless(r.status_code!=200, "deleted %s but we shouldn't have been able to" % TEST_USER_EMAIL)
186 r = requests.get(SLICES_API, auth=opencloud_auth)
189 for slice in mySlices:
190 fail_unless(should_see_slice(myself, slice), "saw slice %s but we shouldn't have" % slice["name"])
191 mySlicesIds = [r["id"] for r in mySlices]
192 for slice in allSlices:
193 if should_see_slice(myself, slice):
194 fail_unless(slice["id"] in mySliceIds, "should have seen slice %s but didnt" % slice["name"])
196 for slice in allSlices:
197 slice = requests.get(SLICES_API + str(slice["id"]) + "/", auth=admin_auth).json()
199 r = requests.put(SLICES_API + str(slice["id"]) +"/", data=slice, auth=opencloud_auth)
200 if should_see_slice(myself, slice):
201 fail_unless(r.status_code==200, "failed to change desc on %s" % slice["name"])
203 fail_unless(r.status_code!=200, "was able to change desc on %s but shouldn't have" % slice["name"])