only do maxDeltaTime if there are rows, make count only count distinct items, some...
[plstackapi.git] / planetstack / hpc_wizard / bigquery_analytics.py
1 import re
2 import base64
3 import requests
4 import urllib
5 import json
6 import httplib2
7 import threading
8 import os
9 import time
10 import traceback
11
12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
19
20 """
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
24 """
25
26 PROJECT_NUMBER = '549187599759'
27
28 try:
29     FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30                                    scope='https://www.googleapis.com/auth/bigquery')
31 except:
32     print "exception while initializing bigquery flow"
33     traceback.print_exc()
34     FLOW = None
35
36 MINUTE_MS = 60*1000
37 HOUR_MS = 60*60*1000
38
39 # global to hold cached mappings
40 mappings = {}
41 reverse_mappings = {}
42
43 def to_number(s):
44    try:
45        if "." in str(s):
46            return float(s)
47        else:
48            return int(s)
49    except:
50        return 0
51
52 class MappingException(Exception):
53     pass
54
55 class BigQueryAnalytics:
56     def __init__(self, table = "demoevents"):
57         self.projectName = "vicci"
58         self.tableName = table
59
60     def reload_mapping(self):
61         global mappings, reverse_mappings
62         mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
63         reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
64
65     def fetch_mapping(self, m=0, table="events"):
66         req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
67         resp = requests.get(req)
68         if (resp.status_code==200):
69                 return resp.text
70         else:
71                 raise Exception('Error accessing register allocations: %d'%resp.status_code)
72
73     def run_query_raw(self, query):
74         try:
75             file("/tmp/query_log","a").write("query %s\n" % query)
76         except:
77             pass
78
79         p = re.compile('%[a-zA-z_]*')
80
81         try:
82             query = p.sub(self.remap, query)
83         except MappingException:
84             self.reload_mapping()
85             query = p.sub(self.remap, query)
86
87         try:
88             file("/tmp/query_log","a").write("remapped query %s\n" % query)
89         except:
90             pass
91
92         storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
93         credentials = storage.get()
94
95         if credentials is None or credentials.invalid:
96                 credentials = run(FLOW, storage)
97
98         http = httplib2.Http()
99         http = credentials.authorize(http)
100
101         service = build('bigquery', 'v2', http=http)
102
103         body = {"query": query,
104                 "timeoutMs": 60000}
105         response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
106
107         return response
108
109     def translate_schema(self, response):
110         for field in response["schema"]["fields"]:
111             field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
112
113     def run_query(self, query):
114         response = self.run_query_raw(query)
115
116         fieldNames = []
117         for field in response["schema"]["fields"]:
118             fieldNames.append(field["name"])
119
120         result = []
121         if "rows" in response:
122             for row in response["rows"]:
123                 this_result = {}
124                 for (i,column) in enumerate(row["f"]):
125                     this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
126                 result.append(this_result)
127
128         return result
129
130     """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
131         are all used for postprocessing queries. The idea is to do one query that
132         includes the ungrouped and unfiltered data, and cache it for multiple
133         consumers who will filter and group it as necessary.
134
135         TODO: Find a more generalized source for these sorts operations. Perhaps
136         put the results in SQLite and then run SQL queries against it.
137     """
138
139     def filter_results(self, rows, name, value):
140         result = [row for row in rows if row.get(name)==value]
141         return result
142
143     def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
144         new_rows = {}
145         for row in rows:
146             groupby_key = [row.get(k, None) for k in groupBy]
147
148             if str(groupby_key) not in new_rows:
149                 new_row = {}
150                 for k in groupBy:
151                     new_row[k] = row.get(k, None)
152
153                 new_rows[str(groupby_key)] = new_row
154             else:
155                 new_row = new_rows[str(groupby_key)]
156
157             for k in sum:
158                 new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
159
160             for k in avg:
161                 new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
162                 new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
163
164             for k in maxi:
165                 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
166
167             for k in count:
168                 v = row.get(k,None)
169                 dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
170                 if (v not in dl):
171                     dl.append(v)
172
173                 #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
174
175         for row in new_rows.values():
176             for k in avg:
177                 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
178                 del row["avg_base_" + k]
179
180             for k in count:
181                 new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
182
183         return new_rows.values()
184
185     def do_computed_fields(self, rows, computed=[]):
186         computedFieldNames=[]
187         for row in rows:
188             for k in computed:
189                 if "/" in k:
190                     parts = k.split("/")
191                     computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
192                     try:
193                         row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
194                     except:
195                         pass
196
197                     if computedFieldName not in computedFieldNames:
198                         computedFieldNames.append(computedFieldName)
199         return (computedFieldNames, rows)
200
201     def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
202         sum = [x.replace("%","") for x in sum]
203         count = [x.replace("%","") for x in count]
204         avg = [x.replace("%","") for x in avg]
205         computed = [x.replace("%","") for x in computed]
206         maxi = [x.replace("%","") for x in maxi]
207
208         for (k,v) in filter.items():
209             rows = self.filter_results(rows, k, v)
210
211         if rows:
212             if maxDeltaTime is not None:
213                 maxTime = max([float(row["time"]) for row in rows])
214                 rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
215
216         (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
217         sum = sum + computedFieldNames
218         rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
219         return rows
220
221     def remap(self, match):
222         if not self.tableName in mappings:
223             raise MappingException("no mapping for table %s" % self.tableName)
224
225         mapping = mappings[self.tableName]
226
227         token = match.group()[1:]
228         if token in mapping:
229             return mapping[token]
230         else:
231             raise MappingException('unknown token %s' % token)
232
233     def dump_table(self, rows, keys=None):
234         if not keys:
235             keys = rows[0].keys()
236
237         lens = {}
238         for key in keys:
239             lens[key] = len(key)
240
241         for row in rows:
242             for key in keys:
243                 thislen = len(str(row.get(key,"")))
244                 lens[key] = max(lens.get(key,0), thislen)
245
246         for key in keys:
247             print "%*s" % (lens[key], key),
248         print
249
250         for row in rows:
251             for key in keys:
252                 print "%*s" % (lens[key], str(row.get(key,""))),
253             print
254
255     def schema_to_cols(self, schema):
256         fields = schema["fields"]
257
258         colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
259
260         cols = []
261         i=0
262         for field in fields:
263             col = {"type": colTypes[field["type"]],
264                    "id": "Col%d" % i,
265                    "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
266             cols.append(col)
267             i=i+1
268
269         return cols
270
271 def main():
272     bq = BigQueryAnalytics()
273
274     rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
275
276     bq.dump_table(rows)
277
278 if __name__ == "__main__":
279     main()