ca08025133071c50d340d8f753838fa86821e930
[plstackapi.git] / planetstack / hpc_wizard / bigquery_analytics.py
1 import re
2 import base64
3 import requests
4 import urllib
5 import json
6 import httplib2
7 import threading
8 import os
9 import time
10 import traceback
11
12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
19
20 """
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
24 """
25
26 PROJECT_NUMBER = '549187599759'
27
28 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
29                                scope='https://www.googleapis.com/auth/bigquery')
30
31 MINUTE_MS = 60*1000
32 HOUR_MS = 60*60*1000
33
34 class BigQueryAnalytics:
35     def __init__(self, table = "demoevents"):
36         self.projectName = "vicci"
37         self.tableName = table
38         self.mapping = json.loads(self.fetch_mapping(table=self.tableName))
39         self.reverse_mapping = {v:k for k, v in self.mapping.items()}
40
41     def fetch_mapping(self, m=0, table="events"):
42         req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
43         resp = requests.get(req)
44         if (resp.status_code==200):
45                 return resp.text
46         else:
47                 raise Exception('Error accessing register allocations: %d'%resp.status_code)
48
49     def run_query_raw(self, query):
50         p = re.compile('%[a-zA-z_]*')
51         query = p.sub(self.remap, query)
52
53         storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
54         credentials = storage.get()
55
56         if credentials is None or credentials.invalid:
57                 credentials = run(FLOW, storage)
58
59         http = httplib2.Http()
60         http = credentials.authorize(http)
61
62         service = build('bigquery', 'v2', http=http)
63
64         body = {"query": query}
65         response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
66
67         return response
68
69     def translate_schema(self, response):
70         for field in response["schema"]["fields"]:
71             field["name"] = self.reverse_mapping.get(field["name"], field["name"])
72
73     def run_query(self, query):
74         response = self.run_query_raw(query)
75
76         fieldNames = []
77         for field in response["schema"]["fields"]:
78             fieldNames.append(field["name"])
79
80         result = []
81         if "rows" in response:
82             for row in response["rows"]:
83                 this_result = {}
84                 for (i,column) in enumerate(row["f"]):
85                     this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
86                 result.append(this_result)
87
88         return result
89
90     def remap(self, match):
91         token = match.group()[1:]
92         if token in self.mapping:
93             return self.mapping[token]
94         else:
95             raise Exception('unknown token %s' % token)
96
97     def dump_table(self, rows, keys=None):
98         if not keys:
99             keys = rows[0].keys()
100
101         lens = {}
102         for key in keys:
103             lens[key] = len(key)
104
105         for row in rows:
106             for key in keys:
107                 thislen = len(str(row.get(key,"")))
108                 lens[key] = max(lens.get(key,0), thislen)
109
110         for key in keys:
111             print "%*s" % (lens[key], key),
112         print
113
114         for row in rows:
115             for key in keys:
116                 print "%*s" % (lens[key], str(row.get(key,""))),
117             print
118
119 def main():
120     bq = BigQueryAnalytics()
121
122     rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
123
124     bq.dump_table(rows)
125
126 if __name__ == "__main__":
127     main()