user's can't set/unset site in Login Details without the proper authorization
[plstackapi.git] / planetstack / hpc_wizard / bigquery_analytics.py
index 2e65707..4a90c2b 100644 (file)
@@ -6,6 +6,7 @@ import json
 import httplib2
 import threading
 import os
+import sys
 import time
 import traceback
 
@@ -28,10 +29,12 @@ PROJECT_NUMBER = '549187599759'
 try:
     FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
                                    scope='https://www.googleapis.com/auth/bigquery')
+    BIGQUERY_AVAILABLE = True
 except:
-    print "exception while initializing bigquery flow"
+    print >> sys.stderr, "exception while initializing bigquery flow"
     traceback.print_exc()
     FLOW = None
+    BIGQUERY_AVAILABLE = False
 
 MINUTE_MS = 60*1000
 HOUR_MS = 60*60*1000
@@ -71,6 +74,11 @@ class BigQueryAnalytics:
                raise Exception('Error accessing register allocations: %d'%resp.status_code)
 
     def run_query_raw(self, query):
+        try:
+            file("/tmp/query_log","a").write("query %s\n" % query)
+        except:
+            pass
+
         p = re.compile('%[a-zA-z_]*')
 
         try:
@@ -79,11 +87,16 @@ class BigQueryAnalytics:
             self.reload_mapping()
             query = p.sub(self.remap, query)
 
+        try:
+            file("/tmp/query_log","a").write("remapped query %s\n" % query)
+        except:
+            pass
+
        storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
        credentials = storage.get()
 
        if credentials is None or credentials.invalid:
-               credentials = run(FLOW, storage)
+            credentials = run(FLOW, storage)
 
        http = httplib2.Http()
        http = credentials.authorize(http)
@@ -91,7 +104,7 @@ class BigQueryAnalytics:
        service = build('bigquery', 'v2', http=http)
 
         body = {"query": query,
-                "timeoutMs": 30000}
+                "timeoutMs": 60000}
         response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
 
         return response
@@ -101,6 +114,10 @@ class BigQueryAnalytics:
             field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
 
     def run_query(self, query):
+        if not BIGQUERY_AVAILABLE:
+            print >> sys.stderr, "bigquery_analytics: bigquery flow is not available. returning empty result."
+            return []
+
         response = self.run_query_raw(query)
 
         fieldNames = []
@@ -155,13 +172,21 @@ class BigQueryAnalytics:
                 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
 
             for k in count:
-                new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
+                v = row.get(k,None)
+                dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
+                if (v not in dl):
+                    dl.append(v)
+
+                #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
 
         for row in new_rows.values():
             for k in avg:
                 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
                 del row["avg_base_" + k]
 
+            for k in count:
+                new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
+
         return new_rows.values()
 
     def do_computed_fields(self, rows, computed=[]):
@@ -186,17 +211,20 @@ class BigQueryAnalytics:
         avg = [x.replace("%","") for x in avg]
         computed = [x.replace("%","") for x in computed]
         maxi = [x.replace("%","") for x in maxi]
+        groupBy = [x.replace("%","") for x in groupBy]
 
         for (k,v) in filter.items():
             rows = self.filter_results(rows, k, v)
 
-        if maxDeltaTime is not None:
-            maxTime = max([float(row["time"]) for row in rows])
-            rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
+        if rows:
+            if maxDeltaTime is not None:
+                maxTime = max([float(row["time"]) for row in rows])
+                rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
 
         (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
         sum = sum + computedFieldNames
-        rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
+        if groupBy:
+            rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
         return rows
 
     def remap(self, match):