user's can't set/unset site in Login Details without the proper authorization
[plstackapi.git] / planetstack / hpc_wizard / bigquery_analytics.py
1 import re
2 import base64
3 import requests
4 import urllib
5 import json
6 import httplib2
7 import threading
8 import os
9 import sys
10 import time
11 import traceback
12
13 from apiclient.discovery import build
14 from apiclient.errors import HttpError
15 from oauth2client.client import AccessTokenRefreshError
16 from oauth2client.client import OAuth2WebServerFlow
17 from oauth2client.client import flow_from_clientsecrets
18 from oauth2client.file import Storage
19 from oauth2client.tools import run_flow,run
20
21 """
22 yum -y install python-httplib2
23 easy_install python_gflags
24 easy_install google_api_python_client
25 """
26
27 PROJECT_NUMBER = '549187599759'
28
29 try:
30     FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
31                                    scope='https://www.googleapis.com/auth/bigquery')
32     BIGQUERY_AVAILABLE = True
33 except:
34     print >> sys.stderr, "exception while initializing bigquery flow"
35     traceback.print_exc()
36     FLOW = None
37     BIGQUERY_AVAILABLE = False
38
39 MINUTE_MS = 60*1000
40 HOUR_MS = 60*60*1000
41
42 # global to hold cached mappings
43 mappings = {}
44 reverse_mappings = {}
45
46 def to_number(s):
47    try:
48        if "." in str(s):
49            return float(s)
50        else:
51            return int(s)
52    except:
53        return 0
54
55 class MappingException(Exception):
56     pass
57
58 class BigQueryAnalytics:
59     def __init__(self, table = "demoevents"):
60         self.projectName = "vicci"
61         self.tableName = table
62
63     def reload_mapping(self):
64         global mappings, reverse_mappings
65         mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
66         reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
67
68     def fetch_mapping(self, m=0, table="events"):
69         req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
70         resp = requests.get(req)
71         if (resp.status_code==200):
72                 return resp.text
73         else:
74                 raise Exception('Error accessing register allocations: %d'%resp.status_code)
75
76     def run_query_raw(self, query):
77         try:
78             file("/tmp/query_log","a").write("query %s\n" % query)
79         except:
80             pass
81
82         p = re.compile('%[a-zA-z_]*')
83
84         try:
85             query = p.sub(self.remap, query)
86         except MappingException:
87             self.reload_mapping()
88             query = p.sub(self.remap, query)
89
90         try:
91             file("/tmp/query_log","a").write("remapped query %s\n" % query)
92         except:
93             pass
94
95         storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
96         credentials = storage.get()
97
98         if credentials is None or credentials.invalid:
99             credentials = run(FLOW, storage)
100
101         http = httplib2.Http()
102         http = credentials.authorize(http)
103
104         service = build('bigquery', 'v2', http=http)
105
106         body = {"query": query,
107                 "timeoutMs": 60000}
108         response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
109
110         return response
111
112     def translate_schema(self, response):
113         for field in response["schema"]["fields"]:
114             field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
115
116     def run_query(self, query):
117         if not BIGQUERY_AVAILABLE:
118             print >> sys.stderr, "bigquery_analytics: bigquery flow is not available. returning empty result."
119             return []
120
121         response = self.run_query_raw(query)
122
123         fieldNames = []
124         for field in response["schema"]["fields"]:
125             fieldNames.append(field["name"])
126
127         result = []
128         if "rows" in response:
129             for row in response["rows"]:
130                 this_result = {}
131                 for (i,column) in enumerate(row["f"]):
132                     this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
133                 result.append(this_result)
134
135         return result
136
137     """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
138         are all used for postprocessing queries. The idea is to do one query that
139         includes the ungrouped and unfiltered data, and cache it for multiple
140         consumers who will filter and group it as necessary.
141
142         TODO: Find a more generalized source for these sorts operations. Perhaps
143         put the results in SQLite and then run SQL queries against it.
144     """
145
146     def filter_results(self, rows, name, value):
147         result = [row for row in rows if row.get(name)==value]
148         return result
149
150     def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
151         new_rows = {}
152         for row in rows:
153             groupby_key = [row.get(k, None) for k in groupBy]
154
155             if str(groupby_key) not in new_rows:
156                 new_row = {}
157                 for k in groupBy:
158                     new_row[k] = row.get(k, None)
159
160                 new_rows[str(groupby_key)] = new_row
161             else:
162                 new_row = new_rows[str(groupby_key)]
163
164             for k in sum:
165                 new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
166
167             for k in avg:
168                 new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
169                 new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
170
171             for k in maxi:
172                 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
173
174             for k in count:
175                 v = row.get(k,None)
176                 dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
177                 if (v not in dl):
178                     dl.append(v)
179
180                 #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
181
182         for row in new_rows.values():
183             for k in avg:
184                 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
185                 del row["avg_base_" + k]
186
187             for k in count:
188                 new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
189
190         return new_rows.values()
191
192     def do_computed_fields(self, rows, computed=[]):
193         computedFieldNames=[]
194         for row in rows:
195             for k in computed:
196                 if "/" in k:
197                     parts = k.split("/")
198                     computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
199                     try:
200                         row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
201                     except:
202                         pass
203
204                     if computedFieldName not in computedFieldNames:
205                         computedFieldNames.append(computedFieldName)
206         return (computedFieldNames, rows)
207
208     def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
209         sum = [x.replace("%","") for x in sum]
210         count = [x.replace("%","") for x in count]
211         avg = [x.replace("%","") for x in avg]
212         computed = [x.replace("%","") for x in computed]
213         maxi = [x.replace("%","") for x in maxi]
214         groupBy = [x.replace("%","") for x in groupBy]
215
216         for (k,v) in filter.items():
217             rows = self.filter_results(rows, k, v)
218
219         if rows:
220             if maxDeltaTime is not None:
221                 maxTime = max([float(row["time"]) for row in rows])
222                 rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
223
224         (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
225         sum = sum + computedFieldNames
226         if groupBy:
227             rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
228         return rows
229
230     def remap(self, match):
231         if not self.tableName in mappings:
232             raise MappingException("no mapping for table %s" % self.tableName)
233
234         mapping = mappings[self.tableName]
235
236         token = match.group()[1:]
237         if token in mapping:
238             return mapping[token]
239         else:
240             raise MappingException('unknown token %s' % token)
241
242     def dump_table(self, rows, keys=None):
243         if not keys:
244             keys = rows[0].keys()
245
246         lens = {}
247         for key in keys:
248             lens[key] = len(key)
249
250         for row in rows:
251             for key in keys:
252                 thislen = len(str(row.get(key,"")))
253                 lens[key] = max(lens.get(key,0), thislen)
254
255         for key in keys:
256             print "%*s" % (lens[key], key),
257         print
258
259         for row in rows:
260             for key in keys:
261                 print "%*s" % (lens[key], str(row.get(key,""))),
262             print
263
264     def schema_to_cols(self, schema):
265         fields = schema["fields"]
266
267         colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
268
269         cols = []
270         i=0
271         for field in fields:
272             col = {"type": colTypes[field["type"]],
273                    "id": "Col%d" % i,
274                    "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
275             cols.append(col)
276             i=i+1
277
278         return cols
279
280 def main():
281     bq = BigQueryAnalytics()
282
283     rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
284
285     bq.dump_table(rows)
286
287 if __name__ == "__main__":
288     main()