cached query support, latest arg to compose_query, compose_latest_query
[plstackapi.git] / planetstack / hpc_wizard / bigquery_analytics.py
1 import re
2 import base64
3 import requests
4 import urllib
5 import json
6 import httplib2
7 import threading
8 import os
9 import time
10 import traceback
11
12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
19
20 """
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
24 """
25
26 PROJECT_NUMBER = '549187599759'
27
28 try:
29     FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30                                    scope='https://www.googleapis.com/auth/bigquery')
31 except:
32     print "exception while initializing bigquery flow"
33     traceback.print_exc()
34     FLOW = None
35
36 MINUTE_MS = 60*1000
37 HOUR_MS = 60*60*1000
38
39 # global to hold cached mappings
40 mappings = {}
41 reverse_mappings = {}
42
43 class MappingException(Exception):
44     pass
45
46 class BigQueryAnalytics:
47     def __init__(self, table = "demoevents"):
48         self.projectName = "vicci"
49         self.tableName = table
50
51     def reload_mapping(self):
52         global mappings, reverse_mappings
53         mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
54         reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
55
56     def fetch_mapping(self, m=0, table="events"):
57         req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
58         resp = requests.get(req)
59         if (resp.status_code==200):
60                 return resp.text
61         else:
62                 raise Exception('Error accessing register allocations: %d'%resp.status_code)
63
64     def run_query_raw(self, query):
65         p = re.compile('%[a-zA-z_]*')
66
67         try:
68             query = p.sub(self.remap, query)
69         except MappingException:
70             self.reload_mapping()
71             query = p.sub(self.remap, query)
72
73         storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
74         credentials = storage.get()
75
76         if credentials is None or credentials.invalid:
77                 credentials = run(FLOW, storage)
78
79         http = httplib2.Http()
80         http = credentials.authorize(http)
81
82         service = build('bigquery', 'v2', http=http)
83
84         body = {"query": query,
85                 "timeoutMs": 30000}
86         response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
87
88         return response
89
90     def translate_schema(self, response):
91         for field in response["schema"]["fields"]:
92             field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
93
94     def run_query(self, query):
95         response = self.run_query_raw(query)
96
97         fieldNames = []
98         for field in response["schema"]["fields"]:
99             fieldNames.append(field["name"])
100
101         result = []
102         if "rows" in response:
103             for row in response["rows"]:
104                 this_result = {}
105                 for (i,column) in enumerate(row["f"]):
106                     this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
107                 result.append(this_result)
108
109         return result
110
111     def remap(self, match):
112         if not self.tableName in mappings:
113             raise MappingException("no mapping for table %s" % self.tableName)
114
115         mapping = mappings[self.tableName]
116
117         token = match.group()[1:]
118         if token in mapping:
119             return mapping[token]
120         else:
121             raise MappingException('unknown token %s' % token)
122
123     def dump_table(self, rows, keys=None):
124         if not keys:
125             keys = rows[0].keys()
126
127         lens = {}
128         for key in keys:
129             lens[key] = len(key)
130
131         for row in rows:
132             for key in keys:
133                 thislen = len(str(row.get(key,"")))
134                 lens[key] = max(lens.get(key,0), thislen)
135
136         for key in keys:
137             print "%*s" % (lens[key], key),
138         print
139
140         for row in rows:
141             for key in keys:
142                 print "%*s" % (lens[key], str(row.get(key,""))),
143             print
144
145     def schema_to_cols(self, schema):
146         fields = schema["fields"]
147
148         colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
149
150         cols = []
151         i=0
152         for field in fields:
153             col = {"type": colTypes[field["type"]],
154                    "id": "Col%d" % i,
155                    "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
156             cols.append(col)
157             i=i+1
158
159         return cols
160
161 def main():
162     bq = BigQueryAnalytics()
163
164     rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
165
166     bq.dump_table(rows)
167
168 if __name__ == "__main__":
169     main()