import httplib2
import threading
import os
+import sys
import time
import traceback
try:
FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
scope='https://www.googleapis.com/auth/bigquery')
+ BIGQUERY_AVAILABLE = True
except:
- print "exception while initializing bigquery flow"
+ print >> sys.stderr, "exception while initializing bigquery flow"
traceback.print_exc()
FLOW = None
+ BIGQUERY_AVAILABLE = False
MINUTE_MS = 60*1000
HOUR_MS = 60*60*1000
raise Exception('Error accessing register allocations: %d'%resp.status_code)
def run_query_raw(self, query):
+ try:
+ file("/tmp/query_log","a").write("query %s\n" % query)
+ except:
+ pass
+
p = re.compile('%[a-zA-z_]*')
try:
self.reload_mapping()
query = p.sub(self.remap, query)
+ try:
+ file("/tmp/query_log","a").write("remapped query %s\n" % query)
+ except:
+ pass
+
storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
credentials = storage.get()
if credentials is None or credentials.invalid:
- credentials = run(FLOW, storage)
+ credentials = run(FLOW, storage)
http = httplib2.Http()
http = credentials.authorize(http)
service = build('bigquery', 'v2', http=http)
body = {"query": query,
- "timeoutMs": 30000}
+ "timeoutMs": 60000}
response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
return response
field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
def run_query(self, query):
+ if not BIGQUERY_AVAILABLE:
+ print >> sys.stderr, "bigquery_analytics: bigquery flow is not available. returning empty result."
+ return []
+
response = self.run_query_raw(query)
fieldNames = []
new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
for k in count:
- new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
+ v = row.get(k,None)
+ dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
+ if (v not in dl):
+ dl.append(v)
+
+ #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
for row in new_rows.values():
for k in avg:
row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
del row["avg_base_" + k]
+ for k in count:
+ new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
+
return new_rows.values()
def do_computed_fields(self, rows, computed=[]):
avg = [x.replace("%","") for x in avg]
computed = [x.replace("%","") for x in computed]
maxi = [x.replace("%","") for x in maxi]
+ groupBy = [x.replace("%","") for x in groupBy]
for (k,v) in filter.items():
rows = self.filter_results(rows, k, v)
- if maxDeltaTime is not None:
- maxTime = max([float(row["time"]) for row in rows])
- rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
+ if rows:
+ if maxDeltaTime is not None:
+ maxTime = max([float(row["time"]) for row in rows])
+ rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
(computedFieldNames, rows) = self.do_computed_fields(rows, computed)
sum = sum + computedFieldNames
- rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
+ if groupBy:
+ rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
return rows
def remap(self, match):