12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
26 PROJECT_NUMBER = '549187599759'
29 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30 scope='https://www.googleapis.com/auth/bigquery')
32 print "exception while initializing bigquery flow"
39 # global to hold cached mappings
52 class MappingException(Exception):
55 class BigQueryAnalytics:
56 def __init__(self, table = "demoevents"):
57 self.projectName = "vicci"
58 self.tableName = table
60 def reload_mapping(self):
61 global mappings, reverse_mappings
62 mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
63 reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
65 def fetch_mapping(self, m=0, table="events"):
66 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
67 resp = requests.get(req)
68 if (resp.status_code==200):
71 raise Exception('Error accessing register allocations: %d'%resp.status_code)
73 def run_query_raw(self, query):
75 file("/tmp/query_log","a").write("query %s\n" % query)
79 p = re.compile('%[a-zA-z_]*')
82 query = p.sub(self.remap, query)
83 except MappingException:
85 query = p.sub(self.remap, query)
88 file("/tmp/query_log","a").write("remapped query %s\n" % query)
92 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
93 credentials = storage.get()
95 if credentials is None or credentials.invalid:
96 credentials = run(FLOW, storage)
98 http = httplib2.Http()
99 http = credentials.authorize(http)
101 service = build('bigquery', 'v2', http=http)
103 body = {"query": query,
105 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
109 def translate_schema(self, response):
110 for field in response["schema"]["fields"]:
111 field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
113 def run_query(self, query):
114 response = self.run_query_raw(query)
117 for field in response["schema"]["fields"]:
118 fieldNames.append(field["name"])
121 if "rows" in response:
122 for row in response["rows"]:
124 for (i,column) in enumerate(row["f"]):
125 this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
126 result.append(this_result)
130 """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
131 are all used for postprocessing queries. The idea is to do one query that
132 includes the ungrouped and unfiltered data, and cache it for multiple
133 consumers who will filter and group it as necessary.
135 TODO: Find a more generalized source for these sorts operations. Perhaps
136 put the results in SQLite and then run SQL queries against it.
139 def filter_results(self, rows, name, value):
140 result = [row for row in rows if row.get(name)==value]
143 def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
146 groupby_key = [row.get(k, None) for k in groupBy]
148 if str(groupby_key) not in new_rows:
151 new_row[k] = row.get(k, None)
153 new_rows[str(groupby_key)] = new_row
155 new_row = new_rows[str(groupby_key)]
158 new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
161 new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
162 new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
165 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
169 dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
173 #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
175 for row in new_rows.values():
177 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
178 del row["avg_base_" + k]
181 new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
183 return new_rows.values()
185 def do_computed_fields(self, rows, computed=[]):
186 computedFieldNames=[]
191 computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
193 row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
197 if computedFieldName not in computedFieldNames:
198 computedFieldNames.append(computedFieldName)
199 return (computedFieldNames, rows)
201 def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
202 sum = [x.replace("%","") for x in sum]
203 count = [x.replace("%","") for x in count]
204 avg = [x.replace("%","") for x in avg]
205 computed = [x.replace("%","") for x in computed]
206 maxi = [x.replace("%","") for x in maxi]
208 for (k,v) in filter.items():
209 rows = self.filter_results(rows, k, v)
212 if maxDeltaTime is not None:
213 maxTime = max([float(row["time"]) for row in rows])
214 rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
216 (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
217 sum = sum + computedFieldNames
218 rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
221 def remap(self, match):
222 if not self.tableName in mappings:
223 raise MappingException("no mapping for table %s" % self.tableName)
225 mapping = mappings[self.tableName]
227 token = match.group()[1:]
229 return mapping[token]
231 raise MappingException('unknown token %s' % token)
233 def dump_table(self, rows, keys=None):
235 keys = rows[0].keys()
243 thislen = len(str(row.get(key,"")))
244 lens[key] = max(lens.get(key,0), thislen)
247 print "%*s" % (lens[key], key),
252 print "%*s" % (lens[key], str(row.get(key,""))),
255 def schema_to_cols(self, schema):
256 fields = schema["fields"]
258 colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
263 col = {"type": colTypes[field["type"]],
265 "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
272 bq = BigQueryAnalytics()
274 rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
278 if __name__ == "__main__":