change compose_query to use a filter dictionary, add queryspec to compose_cached_query
[plstackapi.git] / planetstack / hpc_wizard / bigquery_analytics.py
1 import re
2 import base64
3 import requests
4 import urllib
5 import json
6 import httplib2
7 import threading
8 import os
9 import time
10 import traceback
11
12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
19
20 """
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
24 """
25
26 PROJECT_NUMBER = '549187599759'
27
28 try:
29     FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30                                    scope='https://www.googleapis.com/auth/bigquery')
31 except:
32     print "exception while initializing bigquery flow"
33     traceback.print_exc()
34     FLOW = None
35
36 MINUTE_MS = 60*1000
37 HOUR_MS = 60*60*1000
38
39 # global to hold cached mappings
40 mappings = {}
41 reverse_mappings = {}
42
43 def to_number(s):
44    try:
45        if "." in str(s):
46            return float(s)
47        else:
48            return int(s)
49    except:
50        return 0
51
52 class MappingException(Exception):
53     pass
54
55 class BigQueryAnalytics:
56     def __init__(self, table = "demoevents"):
57         self.projectName = "vicci"
58         self.tableName = table
59
60     def reload_mapping(self):
61         global mappings, reverse_mappings
62         mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
63         reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
64
65     def fetch_mapping(self, m=0, table="events"):
66         req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
67         resp = requests.get(req)
68         if (resp.status_code==200):
69                 return resp.text
70         else:
71                 raise Exception('Error accessing register allocations: %d'%resp.status_code)
72
73     def run_query_raw(self, query):
74         try:
75             file("/tmp/query_log","a").write("query %s\n" % query)
76         except:
77             pass
78
79         p = re.compile('%[a-zA-z_]*')
80
81         try:
82             query = p.sub(self.remap, query)
83         except MappingException:
84             self.reload_mapping()
85             query = p.sub(self.remap, query)
86
87         try:
88             file("/tmp/query_log","a").write("remapped query %s\n" % query)
89         except:
90             pass
91
92         storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
93         credentials = storage.get()
94
95         if credentials is None or credentials.invalid:
96                 credentials = run(FLOW, storage)
97
98         http = httplib2.Http()
99         http = credentials.authorize(http)
100
101         service = build('bigquery', 'v2', http=http)
102
103         body = {"query": query,
104                 "timeoutMs": 60000}
105         response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
106
107         return response
108
109     def translate_schema(self, response):
110         for field in response["schema"]["fields"]:
111             field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
112
113     def run_query(self, query):
114         response = self.run_query_raw(query)
115
116         fieldNames = []
117         for field in response["schema"]["fields"]:
118             fieldNames.append(field["name"])
119
120         result = []
121         if "rows" in response:
122             for row in response["rows"]:
123                 this_result = {}
124                 for (i,column) in enumerate(row["f"]):
125                     this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
126                 result.append(this_result)
127
128         return result
129
130     """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
131         are all used for postprocessing queries. The idea is to do one query that
132         includes the ungrouped and unfiltered data, and cache it for multiple
133         consumers who will filter and group it as necessary.
134
135         TODO: Find a more generalized source for these sorts operations. Perhaps
136         put the results in SQLite and then run SQL queries against it.
137     """
138
139     def filter_results(self, rows, name, value):
140         result = [row for row in rows if row.get(name)==value]
141         return result
142
143     def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
144         new_rows = {}
145         for row in rows:
146             groupby_key = [row.get(k, None) for k in groupBy]
147
148             if str(groupby_key) not in new_rows:
149                 new_row = {}
150                 for k in groupBy:
151                     new_row[k] = row.get(k, None)
152
153                 new_rows[str(groupby_key)] = new_row
154             else:
155                 new_row = new_rows[str(groupby_key)]
156
157             for k in sum:
158                 new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
159
160             for k in avg:
161                 new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
162                 new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
163
164             for k in maxi:
165                 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
166
167             for k in count:
168                 v = row.get(k,None)
169                 dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
170                 if (v not in dl):
171                     dl.append(v)
172
173                 #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
174
175         for row in new_rows.values():
176             for k in avg:
177                 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
178                 del row["avg_base_" + k]
179
180             for k in count:
181                 new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
182
183         return new_rows.values()
184
185     def do_computed_fields(self, rows, computed=[]):
186         computedFieldNames=[]
187         for row in rows:
188             for k in computed:
189                 if "/" in k:
190                     parts = k.split("/")
191                     computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
192                     try:
193                         row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
194                     except:
195                         pass
196
197                     if computedFieldName not in computedFieldNames:
198                         computedFieldNames.append(computedFieldName)
199         return (computedFieldNames, rows)
200
201     def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
202         sum = [x.replace("%","") for x in sum]
203         count = [x.replace("%","") for x in count]
204         avg = [x.replace("%","") for x in avg]
205         computed = [x.replace("%","") for x in computed]
206         maxi = [x.replace("%","") for x in maxi]
207         groupBy = [x.replace("%","") for x in groupBy]
208
209         for (k,v) in filter.items():
210             rows = self.filter_results(rows, k, v)
211
212         if rows:
213             if maxDeltaTime is not None:
214                 maxTime = max([float(row["time"]) for row in rows])
215                 rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
216
217         (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
218         sum = sum + computedFieldNames
219         if groupBy:
220             rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
221         return rows
222
223     def remap(self, match):
224         if not self.tableName in mappings:
225             raise MappingException("no mapping for table %s" % self.tableName)
226
227         mapping = mappings[self.tableName]
228
229         token = match.group()[1:]
230         if token in mapping:
231             return mapping[token]
232         else:
233             raise MappingException('unknown token %s' % token)
234
235     def dump_table(self, rows, keys=None):
236         if not keys:
237             keys = rows[0].keys()
238
239         lens = {}
240         for key in keys:
241             lens[key] = len(key)
242
243         for row in rows:
244             for key in keys:
245                 thislen = len(str(row.get(key,"")))
246                 lens[key] = max(lens.get(key,0), thislen)
247
248         for key in keys:
249             print "%*s" % (lens[key], key),
250         print
251
252         for row in rows:
253             for key in keys:
254                 print "%*s" % (lens[key], str(row.get(key,""))),
255             print
256
257     def schema_to_cols(self, schema):
258         fields = schema["fields"]
259
260         colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
261
262         cols = []
263         i=0
264         for field in fields:
265             col = {"type": colTypes[field["type"]],
266                    "id": "Col%d" % i,
267                    "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
268             cols.append(col)
269             i=i+1
270
271         return cols
272
273 def main():
274     bq = BigQueryAnalytics()
275
276     rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
277
278     bq.dump_table(rows)
279
280 if __name__ == "__main__":
281     main()