44b5dbdb9cb08c91654b4ed6c72c191eecafd47a
[plstackapi.git] / planetstack / hpc_wizard / bigquery_analytics.py
1 import re
2 import base64
3 import requests
4 import urllib
5 import json
6 import httplib2
7 import threading
8 import os
9 import time
10 import traceback
11
12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
19
20 """
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
24 """
25
26 PROJECT_NUMBER = '549187599759'
27
28 try:
29     FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30                                    scope='https://www.googleapis.com/auth/bigquery')
31 except:
32     print "exception while initializing bigquery flow"
33     traceback.print_exc()
34     FLOW = None
35
36 MINUTE_MS = 60*1000
37 HOUR_MS = 60*60*1000
38
39 class BigQueryAnalytics:
40     def __init__(self, table = "demoevents"):
41         self.projectName = "vicci"
42         self.tableName = table
43         self.mapping = json.loads(self.fetch_mapping(table=self.tableName))
44         self.reverse_mapping = {v:k for k, v in self.mapping.items()}
45
46     def fetch_mapping(self, m=0, table="events"):
47         req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
48         resp = requests.get(req)
49         if (resp.status_code==200):
50                 return resp.text
51         else:
52                 raise Exception('Error accessing register allocations: %d'%resp.status_code)
53
54     def run_query_raw(self, query):
55         p = re.compile('%[a-zA-z_]*')
56         query = p.sub(self.remap, query)
57
58         storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
59         credentials = storage.get()
60
61         if credentials is None or credentials.invalid:
62                 credentials = run(FLOW, storage)
63
64         http = httplib2.Http()
65         http = credentials.authorize(http)
66
67         service = build('bigquery', 'v2', http=http)
68
69         body = {"query": query}
70         response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
71
72         return response
73
74     def translate_schema(self, response):
75         for field in response["schema"]["fields"]:
76             field["name"] = self.reverse_mapping.get(field["name"], field["name"])
77
78     def run_query(self, query):
79         response = self.run_query_raw(query)
80
81         fieldNames = []
82         for field in response["schema"]["fields"]:
83             fieldNames.append(field["name"])
84
85         result = []
86         if "rows" in response:
87             for row in response["rows"]:
88                 this_result = {}
89                 for (i,column) in enumerate(row["f"]):
90                     this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
91                 result.append(this_result)
92
93         return result
94
95     def remap(self, match):
96         token = match.group()[1:]
97         if token in self.mapping:
98             return self.mapping[token]
99         else:
100             raise Exception('unknown token %s' % token)
101
102     def dump_table(self, rows, keys=None):
103         if not keys:
104             keys = rows[0].keys()
105
106         lens = {}
107         for key in keys:
108             lens[key] = len(key)
109
110         for row in rows:
111             for key in keys:
112                 thislen = len(str(row.get(key,"")))
113                 lens[key] = max(lens.get(key,0), thislen)
114
115         for key in keys:
116             print "%*s" % (lens[key], key),
117         print
118
119         for row in rows:
120             for key in keys:
121                 print "%*s" % (lens[key], str(row.get(key,""))),
122             print
123
124 def main():
125     bq = BigQueryAnalytics()
126
127     rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
128
129     bq.dump_table(rows)
130
131 if __name__ == "__main__":
132     main()