12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
26 PROJECT_NUMBER = '549187599759'
29 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30 scope='https://www.googleapis.com/auth/bigquery')
32 print "exception while initializing bigquery flow"
39 # global to hold cached mappings
43 class MappingException(Exception):
46 class BigQueryAnalytics:
47 def __init__(self, table = "demoevents"):
48 self.projectName = "vicci"
49 self.tableName = table
51 def reload_mapping(self):
52 global mappings, reverse_mappings
53 mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
54 reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
56 def fetch_mapping(self, m=0, table="events"):
57 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
58 resp = requests.get(req)
59 if (resp.status_code==200):
62 raise Exception('Error accessing register allocations: %d'%resp.status_code)
64 def run_query_raw(self, query):
65 p = re.compile('%[a-zA-z_]*')
68 query = p.sub(self.remap, query)
69 except MappingException:
71 query = p.sub(self.remap, query)
73 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
74 credentials = storage.get()
76 if credentials is None or credentials.invalid:
77 credentials = run(FLOW, storage)
79 http = httplib2.Http()
80 http = credentials.authorize(http)
82 service = build('bigquery', 'v2', http=http)
84 body = {"query": query,
86 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
90 def translate_schema(self, response):
91 for field in response["schema"]["fields"]:
92 field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
94 def run_query(self, query):
95 response = self.run_query_raw(query)
98 for field in response["schema"]["fields"]:
99 fieldNames.append(field["name"])
102 if "rows" in response:
103 for row in response["rows"]:
105 for (i,column) in enumerate(row["f"]):
106 this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
107 result.append(this_result)
111 def remap(self, match):
112 if not self.tableName in mappings:
113 raise MappingException("no mapping for table %s" % self.tableName)
115 mapping = mappings[self.tableName]
117 token = match.group()[1:]
119 return mapping[token]
121 raise MappingException('unknown token %s' % token)
123 def dump_table(self, rows, keys=None):
125 keys = rows[0].keys()
133 thislen = len(str(row.get(key,"")))
134 lens[key] = max(lens.get(key,0), thislen)
137 print "%*s" % (lens[key], key),
142 print "%*s" % (lens[key], str(row.get(key,""))),
145 def schema_to_cols(self, schema):
146 fields = schema["fields"]
148 colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
153 col = {"type": colTypes[field["type"]],
155 "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
162 bq = BigQueryAnalytics()
164 rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
168 if __name__ == "__main__":