Merge branch 'master' of ssh://git.planet-lab.org/git/plstackapi
[plstackapi.git] / planetstack / hpc_wizard / bigquery_analytics.py
1 import re
2 import base64
3 import requests
4 import urllib
5 import json
6 import httplib2
7 import threading
8 import os
9 import time
10 import traceback
11
12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
19
20 """
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
24 """
25
26 PROJECT_NUMBER = '549187599759'
27
28 try:
29     FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30                                    scope='https://www.googleapis.com/auth/bigquery')
31 except:
32     print "exception while initializing bigquery flow"
33     traceback.print_exc()
34     FLOW = None
35
36 MINUTE_MS = 60*1000
37 HOUR_MS = 60*60*1000
38
39 # global to hold cached mappings
40 mappings = {}
41 reverse_mappings = {}
42
43 def to_number(s):
44    try:
45        if "." in str(s):
46            return float(s)
47        else:
48            return int(s)
49    except:
50        return 0
51
52 class MappingException(Exception):
53     pass
54
55 class BigQueryAnalytics:
56     def __init__(self, table = "demoevents"):
57         self.projectName = "vicci"
58         self.tableName = table
59
60     def reload_mapping(self):
61         global mappings, reverse_mappings
62         mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
63         reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
64
65     def fetch_mapping(self, m=0, table="events"):
66         req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
67         resp = requests.get(req)
68         if (resp.status_code==200):
69                 return resp.text
70         else:
71                 raise Exception('Error accessing register allocations: %d'%resp.status_code)
72
73     def run_query_raw(self, query):
74         try:
75             file("/tmp/query_log","a").write("query %s\n" % query)
76         except:
77             pass
78
79         p = re.compile('%[a-zA-z_]*')
80
81         try:
82             query = p.sub(self.remap, query)
83         except MappingException:
84             self.reload_mapping()
85             query = p.sub(self.remap, query)
86
87         try:
88             file("/tmp/query_log","a").write("remapped query %s\n" % query)
89         except:
90             pass
91
92         storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
93         credentials = storage.get()
94
95         if credentials is None or credentials.invalid:
96                 credentials = run(FLOW, storage)
97
98         http = httplib2.Http()
99         http = credentials.authorize(http)
100
101         service = build('bigquery', 'v2', http=http)
102
103         body = {"query": query,
104                 "timeoutMs": 60000}
105         response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
106
107         return response
108
109     def translate_schema(self, response):
110         for field in response["schema"]["fields"]:
111             field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
112
113     def run_query(self, query):
114         response = self.run_query_raw(query)
115
116         fieldNames = []
117         for field in response["schema"]["fields"]:
118             fieldNames.append(field["name"])
119
120         result = []
121         if "rows" in response:
122             for row in response["rows"]:
123                 this_result = {}
124                 for (i,column) in enumerate(row["f"]):
125                     this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
126                 result.append(this_result)
127
128         return result
129
130     """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
131         are all used for postprocessing queries. The idea is to do one query that
132         includes the ungrouped and unfiltered data, and cache it for multiple
133         consumers who will filter and group it as necessary.
134
135         TODO: Find a more generalized source for these sorts operations. Perhaps
136         put the results in SQLite and then run SQL queries against it.
137     """
138
139     def filter_results(self, rows, name, value):
140         result = [row for row in rows if row.get(name)==value]
141         return result
142
143     def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
144         new_rows = {}
145         for row in rows:
146             groupby_key = [row.get(k, None) for k in groupBy]
147
148             if str(groupby_key) not in new_rows:
149                 new_row = {}
150                 for k in groupBy:
151                     new_row[k] = row.get(k, None)
152
153                 new_rows[str(groupby_key)] = new_row
154             else:
155                 new_row = new_rows[str(groupby_key)]
156
157             for k in sum:
158                 new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
159
160             for k in avg:
161                 new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
162                 new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
163
164             for k in maxi:
165                 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
166
167             for k in count:
168                 v = row.get(k,None)
169                 dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
170                 if (v not in dl):
171                     dl.append(v)
172
173                 #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
174
175         for row in new_rows.values():
176             for k in avg:
177                 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
178                 del row["avg_base_" + k]
179
180             for k in count:
181                 new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
182
183         return new_rows.values()
184
185     def do_computed_fields(self, rows, computed=[]):
186         computedFieldNames=[]
187         for row in rows:
188             for k in computed:
189                 if "/" in k:
190                     parts = k.split("/")
191                     computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
192                     try:
193                         row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
194                     except:
195                         pass
196
197                     if computedFieldName not in computedFieldNames:
198                         computedFieldNames.append(computedFieldName)
199         return (computedFieldNames, rows)
200
201     def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
202         sum = [x.replace("%","") for x in sum]
203         count = [x.replace("%","") for x in count]
204         avg = [x.replace("%","") for x in avg]
205         computed = [x.replace("%","") for x in computed]
206         maxi = [x.replace("%","") for x in maxi]
207         groupBy = [x.replace("%","") for x in groupBy]
208
209         for (k,v) in filter.items():
210             rows = self.filter_results(rows, k, v)
211
212         if rows:
213             if maxDeltaTime is not None:
214                 maxTime = max([float(row["time"]) for row in rows])
215                 rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
216
217         (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
218         sum = sum + computedFieldNames
219         rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
220         return rows
221
222     def remap(self, match):
223         if not self.tableName in mappings:
224             raise MappingException("no mapping for table %s" % self.tableName)
225
226         mapping = mappings[self.tableName]
227
228         token = match.group()[1:]
229         if token in mapping:
230             return mapping[token]
231         else:
232             raise MappingException('unknown token %s' % token)
233
234     def dump_table(self, rows, keys=None):
235         if not keys:
236             keys = rows[0].keys()
237
238         lens = {}
239         for key in keys:
240             lens[key] = len(key)
241
242         for row in rows:
243             for key in keys:
244                 thislen = len(str(row.get(key,"")))
245                 lens[key] = max(lens.get(key,0), thislen)
246
247         for key in keys:
248             print "%*s" % (lens[key], key),
249         print
250
251         for row in rows:
252             for key in keys:
253                 print "%*s" % (lens[key], str(row.get(key,""))),
254             print
255
256     def schema_to_cols(self, schema):
257         fields = schema["fields"]
258
259         colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
260
261         cols = []
262         i=0
263         for field in fields:
264             col = {"type": colTypes[field["type"]],
265                    "id": "Col%d" % i,
266                    "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
267             cols.append(col)
268             i=i+1
269
270         return cols
271
272 def main():
273     bq = BigQueryAnalytics()
274
275     rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
276
277     bq.dump_table(rows)
278
279 if __name__ == "__main__":
280     main()