longer timeout for bigquery queries
[plstackapi.git] / planetstack / hpc_wizard / bigquery_analytics.py
1 import re
2 import base64
3 import requests
4 import urllib
5 import json
6 import httplib2
7 import threading
8 import os
9 import time
10 import traceback
11
12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
19
20 """
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
24 """
25
26 PROJECT_NUMBER = '549187599759'
27
28 try:
29     FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30                                    scope='https://www.googleapis.com/auth/bigquery')
31 except:
32     print "exception while initializing bigquery flow"
33     traceback.print_exc()
34     FLOW = None
35
36 MINUTE_MS = 60*1000
37 HOUR_MS = 60*60*1000
38
39 class BigQueryAnalytics:
40     def __init__(self, table = "demoevents"):
41         self.projectName = "vicci"
42         self.tableName = table
43         self.mapping = json.loads(self.fetch_mapping(table=self.tableName))
44         self.reverse_mapping = {v:k for k, v in self.mapping.items()}
45
46     def fetch_mapping(self, m=0, table="events"):
47         req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
48         resp = requests.get(req)
49         if (resp.status_code==200):
50                 return resp.text
51         else:
52                 raise Exception('Error accessing register allocations: %d'%resp.status_code)
53
54     def run_query_raw(self, query):
55         p = re.compile('%[a-zA-z_]*')
56         query = p.sub(self.remap, query)
57
58         storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
59         credentials = storage.get()
60
61         if credentials is None or credentials.invalid:
62                 credentials = run(FLOW, storage)
63
64         http = httplib2.Http()
65         http = credentials.authorize(http)
66
67         service = build('bigquery', 'v2', http=http)
68
69         body = {"query": query,
70                 "timeoutMs": 30000}
71         response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
72
73         return response
74
75     def translate_schema(self, response):
76         for field in response["schema"]["fields"]:
77             field["name"] = self.reverse_mapping.get(field["name"], field["name"])
78
79     def run_query(self, query):
80         response = self.run_query_raw(query)
81
82         fieldNames = []
83         for field in response["schema"]["fields"]:
84             fieldNames.append(field["name"])
85
86         result = []
87         if "rows" in response:
88             for row in response["rows"]:
89                 this_result = {}
90                 for (i,column) in enumerate(row["f"]):
91                     this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
92                 result.append(this_result)
93
94         return result
95
96     def remap(self, match):
97         token = match.group()[1:]
98         if token in self.mapping:
99             return self.mapping[token]
100         else:
101             raise Exception('unknown token %s' % token)
102
103     def dump_table(self, rows, keys=None):
104         if not keys:
105             keys = rows[0].keys()
106
107         lens = {}
108         for key in keys:
109             lens[key] = len(key)
110
111         for row in rows:
112             for key in keys:
113                 thislen = len(str(row.get(key,"")))
114                 lens[key] = max(lens.get(key,0), thislen)
115
116         for key in keys:
117             print "%*s" % (lens[key], key),
118         print
119
120         for row in rows:
121             for key in keys:
122                 print "%*s" % (lens[key], str(row.get(key,""))),
123             print
124
125 def main():
126     bq = BigQueryAnalytics()
127
128     rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
129
130     bq.dump_table(rows)
131
132 if __name__ == "__main__":
133     main()