Added a test case for the generated REST API. Run as:
authorSapan Bhatia <gwsapan@gmail.com>
Fri, 20 Jun 2014 06:21:20 +0000 (02:21 -0400)
committerSapan Bhatia <gwsapan@gmail.com>
Fri, 20 Jun 2014 06:21:20 +0000 (02:21 -0400)
./manage.py test core

planetstack/core/tests.py [new file with mode: 0644]

diff --git a/planetstack/core/tests.py b/planetstack/core/tests.py
new file mode 100644 (file)
index 0000000..1c490c8
--- /dev/null
@@ -0,0 +1,128 @@
+#!/usr/bin/python
+from django.test import TestCase
+from core.models import *
+from rest_framework.test import *
+from genapi import *
+import json
+from datetime import datetime
+
+FIXTURES_FILE = 'core/fixtures/initial_data.json'
+MODELS = ['Deployment','Image','Node','Reservation','Slice','Sliver','User']
+
+def is_dynamic_type(x):
+       t = type(x)
+       return t in [datetime]
+
+class APITestCase(TestCase):
+       def setUp(self):
+               self.init_data=json.loads(open(FIXTURES_FILE).read())
+               self.data_dict={}
+               self.hidden_keys={}
+
+               for d in self.init_data:
+                       model_tag = d['model']
+                       model_name = model_tag.split('.')[1]
+
+                       try:
+                               self.data_dict[model_name].append(d)
+                       except:
+                               self.data_dict[model_name]=[d]
+
+               # Any admin user would do
+               self.calling_user = User('sapan@onlab.us')
+               self.client = APIClient()
+               self.client.force_authenticate(user=self.calling_user)
+
+
+       def check_items(self, response, data_list):
+               rdict = {}
+               for r in response:
+                       rdict['%d'%r['id']]=r
+
+               for d in data_list:
+                       match = True
+                       try:
+                               item = rdict['%d'%d['pk']]
+                       except Exception,e:
+                               print 'API missing item %d / %r'%(d['pk'],rdict.keys())
+                               raise e
+
+                       fields=d['fields']
+                       fields['id']=d['pk']
+
+                       for k in item.keys():
+                               try:
+                                       resp_val = fields[k]
+                               except KeyError:
+                                       if (not self.hidden_keys.has_key(k)):
+                                               print 'Hidden key %s'%k
+                                               self.hidden_keys[k]=True
+
+                                       continue
+
+                               if (item[k]!=resp_val and not is_dynamic_type(item[k])):
+                                       if (type(resp_val)==type(item[k])):
+                                               print 'Key %s did not match: 1. %r 2. %r'%(k,item[k],resp_val)
+                                               print fields
+                                               match = False
+
+
+
+       def create(self, model, mplural, record):
+               request = self.client.put('/plstackapi/%s/'%mplural,record['fields'])
+
+               #if (len2==len1):
+               #       raise Exception('Could not delete %s/%d'%(model,pk))
+
+               return
+
+       def update(self, model, mplural, pk):
+               src_record = self.data_dict[model.lower()][0]
+               record_to_update = src_record['fields']
+               now = datetime.now()
+               record_to_update['enacted']=now
+               response = self.client.put('/plstackapi/%s/%d/'%(mplural,pk),record_to_update)
+               self.assertEqual(response.data['enacted'],now)
+
+               return
+
+       def delete(self, model, mplural, pk):
+               mclass = globals()[model]
+               len1 = len(mclass.objects.all())
+               response = self.client.delete('/plstackapi/%s/%d/'%(mplural,pk))
+               len2 = len(mclass.objects.all())
+               self.assertNotEqual(len1,len2)
+
+               return
+
+       def retrieve(self, m, mplural, mlower):
+               response = self.client.get('/plstackapi/%s/'%mplural)
+               #force_authenticate(request,user=self.calling_user)
+               self.check_items(response.data,self.data_dict[mlower])
+
+               return
+
+       def test_initial_retrieve(self):
+               for m in MODELS:
+                       print 'Checking retrieve on %s...'%m
+                       self.retrieve(m, m.lower()+'s',m.lower())
+
+       
+       def test_update(self):
+               for m in MODELS:
+                       print 'Checking update on %s...'%m
+                       first = self.data_dict[m.lower()][0]['pk']
+                       self.update(m, m.lower()+'s',int(first))
+       
+       def test_delete(self):
+               for m in MODELS:
+                       print 'Checking delete on %s...'%m
+                       first = self.data_dict[m.lower()][0]['pk']
+                       self.delete(m, m.lower()+'s',int(first))
+
+       def test_create(self):
+               for m in MODELS:
+                       print 'Checking create on %s...'%m
+                       first = self.data_dict[m.lower()][0]
+                       self.create(m, m.lower()+'s',first)
+