postprocessing support
[plstackapi.git] / planetstack / hpc_wizard / bigquery_analytics.py
1 import re
2 import base64
3 import requests
4 import urllib
5 import json
6 import httplib2
7 import threading
8 import os
9 import time
10 import traceback
11
12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
19
20 """
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
24 """
25
26 PROJECT_NUMBER = '549187599759'
27
28 try:
29     FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30                                    scope='https://www.googleapis.com/auth/bigquery')
31 except:
32     print "exception while initializing bigquery flow"
33     traceback.print_exc()
34     FLOW = None
35
36 MINUTE_MS = 60*1000
37 HOUR_MS = 60*60*1000
38
39 # global to hold cached mappings
40 mappings = {}
41 reverse_mappings = {}
42
43 def to_number(s):
44    try:
45        if "." in str(s):
46            return float(s)
47        else:
48            return int(s)
49    except:
50        return 0
51
52 class MappingException(Exception):
53     pass
54
55 class BigQueryAnalytics:
56     def __init__(self, table = "demoevents"):
57         self.projectName = "vicci"
58         self.tableName = table
59
60     def reload_mapping(self):
61         global mappings, reverse_mappings
62         mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
63         reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
64
65     def fetch_mapping(self, m=0, table="events"):
66         req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
67         resp = requests.get(req)
68         if (resp.status_code==200):
69                 return resp.text
70         else:
71                 raise Exception('Error accessing register allocations: %d'%resp.status_code)
72
73     def run_query_raw(self, query):
74         p = re.compile('%[a-zA-z_]*')
75
76         try:
77             query = p.sub(self.remap, query)
78         except MappingException:
79             self.reload_mapping()
80             query = p.sub(self.remap, query)
81
82         storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
83         credentials = storage.get()
84
85         if credentials is None or credentials.invalid:
86                 credentials = run(FLOW, storage)
87
88         http = httplib2.Http()
89         http = credentials.authorize(http)
90
91         service = build('bigquery', 'v2', http=http)
92
93         body = {"query": query,
94                 "timeoutMs": 30000}
95         response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
96
97         return response
98
99     def translate_schema(self, response):
100         for field in response["schema"]["fields"]:
101             field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
102
103     def run_query(self, query):
104         response = self.run_query_raw(query)
105
106         fieldNames = []
107         for field in response["schema"]["fields"]:
108             fieldNames.append(field["name"])
109
110         result = []
111         if "rows" in response:
112             for row in response["rows"]:
113                 this_result = {}
114                 for (i,column) in enumerate(row["f"]):
115                     this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
116                 result.append(this_result)
117
118         return result
119
120     """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
121         are all used for postprocessing queries. The idea is to do one query that
122         includes the ungrouped and unfiltered data, and cache it for multiple
123         consumers who will filter and group it as necessary.
124
125         TODO: Find a more generalized source for these sorts operations. Perhaps
126         put the results in SQLite and then run SQL queries against it.
127     """
128
129     def filter_results(self, rows, name, value):
130         result = [row for row in rows if row.get(name)==value]
131         return result
132
133     def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
134         new_rows = {}
135         for row in rows:
136             groupby_key = [row.get(k, None) for k in groupBy]
137
138             if str(groupby_key) not in new_rows:
139                 new_row = {}
140                 for k in groupBy:
141                     new_row[k] = row.get(k, None)
142
143                 new_rows[str(groupby_key)] = new_row
144             else:
145                 new_row = new_rows[str(groupby_key)]
146
147             for k in sum:
148                 new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
149
150             for k in avg:
151                 new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
152                 new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
153
154             for k in maxi:
155                 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
156
157             for k in count:
158                 new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
159
160         for row in new_rows.values():
161             for k in avg:
162                 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
163                 del row["avg_base_" + k]
164
165         return new_rows.values()
166
167     def do_computed_fields(self, rows, computed=[]):
168         computedFieldNames=[]
169         for row in rows:
170             for k in computed:
171                 if "/" in k:
172                     parts = k.split("/")
173                     computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
174                     try:
175                         row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
176                     except:
177                         pass
178
179                     if computedFieldName not in computedFieldNames:
180                         computedFieldNames.append(computedFieldName)
181         return (computedFieldNames, rows)
182
183     def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
184         sum = [x.replace("%","") for x in sum]
185         count = [x.replace("%","") for x in count]
186         avg = [x.replace("%","") for x in avg]
187         computed = [x.replace("%","") for x in computed]
188         maxi = [x.replace("%","") for x in maxi]
189
190         for (k,v) in filter.items():
191             rows = self.filter_results(rows, k, v)
192
193         if maxDeltaTime is not None:
194             maxTime = max([float(row["time"]) for row in rows])
195             rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
196
197         (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
198         sum = sum + computedFieldNames
199         rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
200         return rows
201
202     def remap(self, match):
203         if not self.tableName in mappings:
204             raise MappingException("no mapping for table %s" % self.tableName)
205
206         mapping = mappings[self.tableName]
207
208         token = match.group()[1:]
209         if token in mapping:
210             return mapping[token]
211         else:
212             raise MappingException('unknown token %s' % token)
213
214     def dump_table(self, rows, keys=None):
215         if not keys:
216             keys = rows[0].keys()
217
218         lens = {}
219         for key in keys:
220             lens[key] = len(key)
221
222         for row in rows:
223             for key in keys:
224                 thislen = len(str(row.get(key,"")))
225                 lens[key] = max(lens.get(key,0), thislen)
226
227         for key in keys:
228             print "%*s" % (lens[key], key),
229         print
230
231         for row in rows:
232             for key in keys:
233                 print "%*s" % (lens[key], str(row.get(key,""))),
234             print
235
236     def schema_to_cols(self, schema):
237         fields = schema["fields"]
238
239         colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
240
241         cols = []
242         i=0
243         for field in fields:
244             col = {"type": colTypes[field["type"]],
245                    "id": "Col%d" % i,
246                    "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
247             cols.append(col)
248             i=i+1
249
250         return cols
251
252 def main():
253     bq = BigQueryAnalytics()
254
255     rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
256
257     bq.dump_table(rows)
258
259 if __name__ == "__main__":
260     main()