12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
26 PROJECT_NUMBER = '549187599759'
29 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30 scope='https://www.googleapis.com/auth/bigquery')
32 print "exception while initializing bigquery flow"
39 # global to hold cached mappings
52 class MappingException(Exception):
55 class BigQueryAnalytics:
56 def __init__(self, table = "demoevents"):
57 self.projectName = "vicci"
58 self.tableName = table
60 def reload_mapping(self):
61 global mappings, reverse_mappings
62 mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
63 reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
65 def fetch_mapping(self, m=0, table="events"):
66 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
67 resp = requests.get(req)
68 if (resp.status_code==200):
71 raise Exception('Error accessing register allocations: %d'%resp.status_code)
73 def run_query_raw(self, query):
75 file("/tmp/query_log","a").write("query %s\n" % query)
79 p = re.compile('%[a-zA-z_]*')
82 query = p.sub(self.remap, query)
83 except MappingException:
85 query = p.sub(self.remap, query)
88 file("/tmp/query_log","a").write("remapped query %s\n" % query)
92 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
93 credentials = storage.get()
95 if credentials is None or credentials.invalid:
96 credentials = run(FLOW, storage)
98 http = httplib2.Http()
99 http = credentials.authorize(http)
101 service = build('bigquery', 'v2', http=http)
103 body = {"query": query,
105 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
109 def translate_schema(self, response):
110 for field in response["schema"]["fields"]:
111 field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
113 def run_query(self, query):
114 response = self.run_query_raw(query)
117 for field in response["schema"]["fields"]:
118 fieldNames.append(field["name"])
121 if "rows" in response:
122 for row in response["rows"]:
124 for (i,column) in enumerate(row["f"]):
125 this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
126 result.append(this_result)
130 """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
131 are all used for postprocessing queries. The idea is to do one query that
132 includes the ungrouped and unfiltered data, and cache it for multiple
133 consumers who will filter and group it as necessary.
135 TODO: Find a more generalized source for these sorts operations. Perhaps
136 put the results in SQLite and then run SQL queries against it.
139 def filter_results(self, rows, name, value):
140 result = [row for row in rows if row.get(name)==value]
143 def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
146 groupby_key = [row.get(k, None) for k in groupBy]
148 if str(groupby_key) not in new_rows:
151 new_row[k] = row.get(k, None)
153 new_rows[str(groupby_key)] = new_row
155 new_row = new_rows[str(groupby_key)]
158 new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
161 new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
162 new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
165 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
169 dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
173 #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
175 for row in new_rows.values():
177 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
178 del row["avg_base_" + k]
181 new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
183 return new_rows.values()
185 def do_computed_fields(self, rows, computed=[]):
186 computedFieldNames=[]
191 computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
193 row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
197 if computedFieldName not in computedFieldNames:
198 computedFieldNames.append(computedFieldName)
199 return (computedFieldNames, rows)
201 def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
202 sum = [x.replace("%","") for x in sum]
203 count = [x.replace("%","") for x in count]
204 avg = [x.replace("%","") for x in avg]
205 computed = [x.replace("%","") for x in computed]
206 maxi = [x.replace("%","") for x in maxi]
207 groupBy = [x.replace("%","") for x in groupBy]
209 for (k,v) in filter.items():
210 rows = self.filter_results(rows, k, v)
213 if maxDeltaTime is not None:
214 maxTime = max([float(row["time"]) for row in rows])
215 rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
217 (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
218 sum = sum + computedFieldNames
219 rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
222 def remap(self, match):
223 if not self.tableName in mappings:
224 raise MappingException("no mapping for table %s" % self.tableName)
226 mapping = mappings[self.tableName]
228 token = match.group()[1:]
230 return mapping[token]
232 raise MappingException('unknown token %s' % token)
234 def dump_table(self, rows, keys=None):
236 keys = rows[0].keys()
244 thislen = len(str(row.get(key,"")))
245 lens[key] = max(lens.get(key,0), thislen)
248 print "%*s" % (lens[key], key),
253 print "%*s" % (lens[key], str(row.get(key,""))),
256 def schema_to_cols(self, schema):
257 fields = schema["fields"]
259 colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
264 col = {"type": colTypes[field["type"]],
266 "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
273 bq = BigQueryAnalytics()
275 rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
279 if __name__ == "__main__":