""" useraccesstest.py This is a basic REST API permission test. Call it with a username and a password, and it will try to read and update some user and slice object, and report if something is broken. This is not an exhaustive test. """ import inspect import json import os import requests import sys from operator import itemgetter, attrgetter REST_API="http://node43.princeton.vicci.org:8000/plstackapi/" USERS_API = REST_API + "users/" SLICES_API = REST_API + "slices/" SITES_API = REST_API + "sites/" SITEPRIV_API = REST_API + "site_privileges/" SLICEPRIV_API = REST_API + "slice_memberships/" SITEROLE_API = REST_API + "site_roles/" SLICEROLE_API = REST_API + "slice_roles/" TEST_USER_EMAIL = "test1234@test.com" username = sys.argv[1] password = sys.argv[2] opencloud_auth=(username, password) admin_auth=("scott@onlab.us", "letmein") # admin creds, used to get full set of objects def fail_unless(x, msg): if not x: (frame, filename, line_number, function_name, lines, index) = inspect.getouterframes(inspect.currentframe())[1] print "FAIL (%s:%d)" % (function_name, line_number), msg print "downloading objects using admin" r = requests.get(USERS_API + "?no_hyperlinks=1", auth=admin_auth) allUsers = r.json() r = requests.get(SLICES_API + "?no_hyperlinks=1", auth=admin_auth) allSlices = r.json() r = requests.get(SITES_API + "?no_hyperlinks=1", auth=admin_auth) allSites = r.json() r = requests.get(SITEPRIV_API + "?no_hyperlinks=1", auth=admin_auth) allSitePriv = r.json() r = requests.get(SLICEPRIV_API + "?no_hyperlinks=1", auth=admin_auth) allSlicePriv = r.json() r = requests.get(SITEROLE_API + "?no_hyperlinks=1", auth=admin_auth) allSiteRole = r.json() r = requests.get(SLICEROLE_API + "?no_hyperlinks=1", auth=admin_auth) allSliceRole = r.json() def should_see_user(myself, otherUser): if myself["is_admin"]: return True if myself["id"] == otherUser["id"]: return True for sitePriv in allSitePriv: if (sitePriv["user"] == myself["id"]) and (sitePriv["site"] == otherUser["site"]): for role in allSiteRole: if role["role"]=="pi" and role["id"] == sitePriv["role"]: return True return False def should_see_slice(myself, slice): if myself["is_admin"]: return True for sitePriv in allSitePriv: if (sitePriv["user"] == myself["id"]) and (sitePriv["site"] == slice["site"]): for role in allSiteRole: if role["role"]=="pi" and role["id"] == sitePriv["role"]: return True for slicePriv in allSlicePriv: if (slicePriv["user"] == myself["id"]) and (sitePriv["slice"] == slice["id"]): for role in allSliceRole: if role["role"]=="pi" and role["id"] == slicePriv["role"]: return True return False def flip_phone(user): if user["phone"] == "123": user["phone"] = "456" else: user["phone"] = "123" def flip_desc(slice): if slice["description"] == "some_description": slice["description"] = "some_other_description" else: slice["description"] = "some_description" def delete_user_if_exists(email): r = requests.get(USERS_API +"?email=%s" % email, auth=admin_auth) if r.status_code==200: user = r.json() if len(user)>0: user=user[0] r = requests.delete(USERS_API + str(user["id"]) + "/", auth=admin_auth) fail_unless(r.status_code==200, "failed to delete the test user") print " loaded user:%d slice:%d, site:%d, site_priv:%d slice_priv:%d" % (len(allUsers), len(allSlices), len(allSites), len(allSitePriv), len(allSlicePriv)) # get our own user record r = requests.get(USERS_API + "?email=%s&no_hyperlinks" % username, auth=opencloud_auth) fail_unless(r.status_code==200, "failed to get user %s" % username) myself = r.json() fail_unless(len(myself)==1, "wrong number of results when getting user %s" % username) myself = myself[0] # check to see that we see the users we should be able to r = requests.get(USERS_API, auth=opencloud_auth) myUsers = r.json() for user in myUsers: fail_unless(should_see_user(myself, user), "saw user %s but we shouldn't have" % user["email"]) myUsersIds = [r["id"] for r in myUsers] for user in allUsers: if should_see_user(myself, user): fail_unless(user["id"] in myUsersIds, "should have seen user %s but didnt" % user["email"]) # toggle the phone number on the users we should be able to """ for user in allUsers: user = requests.get(USERS_API + str(user["id"]) + "/", auth=admin_auth).json() flip_phone(user) r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth) if should_see_user(myself, user): fail_unless(r.status_code==200, "failed to change phone number on %s" % user["email"]) else: # XXX: this is failing, but for the wrong reason fail_unless(r.status_code!=200, "was able to change phone number on %s but shouldn't have" % user["email"]) # try changing is_staff. We should be able to do it if we're an admin, but not # otherwise. for user in allUsers: user = requests.get(USERS_API + str(user["id"]) + "/", auth=admin_auth).json() user["is_staff"] = not user["is_staff"] r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth) if myself["is_admin"]: fail_unless(r.status_code==200, "failed to change is_staff on %s" % user["email"]) else: # XXX: this is failing, but for the wrong reason fail_unless(r.status_code!=200, "was able to change is_staff on %s but shouldn't have" % user["email"]) # put it back to false, in case we successfully changed it... user["is_staff"] = False r = requests.put(USERS_API + str(user["id"]) +"/", data=user, auth=opencloud_auth) """ # delete the TEST_USER_EMAIL if it exists delete_user_if_exists(TEST_USER_EMAIL) newUser = {"firstname": "test", "lastname": "1234", "email": TEST_USER_EMAIL, "password": "letmein"} r = requests.post(USERS_API, data=newUser, auth=opencloud_auth) if myself["is_admin"]: fail_unless(r.status_code==200, "failed to create %s" % TEST_USER_EMAIL) else: fail_unless(r.status_code!=200, "created %s but we shouldn't have been able to" % TEST_USER_EMAIL) delete_user_if_exists(TEST_USER_EMAIL) sys.exit(-1) # now create it as admin r = requests.post(USERS_API, data=newUser, auth=admin_auth) fail_unless(r.status_code==201, "failed to create %s as admin" % TEST_USER_EMAIL) user = requests.get(USERS_API +"?email=%s" % TEST_USER_EMAIL, auth=admin_auth).json()[0] r = requests.delete(USERS_API + str(user["id"]) + "/", auth=opencloud_auth) if myself["is_admin"]: fail_unless(r.status_code==200, "failed to delete %s" % TEST_USER_EMAIL) else: fail_unless(r.status_code!=200, "deleted %s but we shouldn't have been able to" % TEST_USER_EMAIL) # slice tests r = requests.get(SLICES_API, auth=opencloud_auth) mySlices = r.json() for slice in mySlices: fail_unless(should_see_slice(myself, slice), "saw slice %s but we shouldn't have" % slice["name"]) mySlicesIds = [r["id"] for r in mySlices] for slice in allSlices: if should_see_slice(myself, slice): fail_unless(slice["id"] in mySliceIds, "should have seen slice %s but didnt" % slice["name"]) for slice in allSlices: slice = requests.get(SLICES_API + str(slice["id"]) + "/", auth=admin_auth).json() flip_desc(slice) r = requests.put(SLICES_API + str(slice["id"]) +"/", data=slice, auth=opencloud_auth) if should_see_slice(myself, slice): fail_unless(r.status_code==200, "failed to change desc on %s" % slice["name"]) else: fail_unless(r.status_code!=200, "was able to change desc on %s but shouldn't have" % slice["name"]) print "Done."