13 from apiclient.discovery import build
14 from apiclient.errors import HttpError
15 from oauth2client.client import AccessTokenRefreshError
16 from oauth2client.client import OAuth2WebServerFlow
17 from oauth2client.client import flow_from_clientsecrets
18 from oauth2client.file import Storage
19 from oauth2client.tools import run_flow,run
22 yum -y install python-httplib2
23 easy_install python_gflags
24 easy_install google_api_python_client
27 PROJECT_NUMBER = '549187599759'
30 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
31 scope='https://www.googleapis.com/auth/bigquery')
32 BIGQUERY_AVAILABLE = True
34 print >> sys.stderr, "exception while initializing bigquery flow"
37 BIGQUERY_AVAILABLE = False
42 # global to hold cached mappings
55 class MappingException(Exception):
58 class BigQueryAnalytics:
59 def __init__(self, table = "demoevents"):
60 self.projectName = "vicci"
61 self.tableName = table
63 def reload_mapping(self):
64 global mappings, reverse_mappings
65 mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
66 reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
68 def fetch_mapping(self, m=0, table="events"):
69 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
70 resp = requests.get(req)
71 if (resp.status_code==200):
74 raise Exception('Error accessing register allocations: %d'%resp.status_code)
76 def run_query_raw(self, query):
78 file("/tmp/query_log","a").write("query %s\n" % query)
82 p = re.compile('%[a-zA-z_]*')
85 query = p.sub(self.remap, query)
86 except MappingException:
88 query = p.sub(self.remap, query)
91 file("/tmp/query_log","a").write("remapped query %s\n" % query)
95 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
96 credentials = storage.get()
98 if credentials is None or credentials.invalid:
99 credentials = run(FLOW, storage)
101 http = httplib2.Http()
102 http = credentials.authorize(http)
104 service = build('bigquery', 'v2', http=http)
106 body = {"query": query,
108 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
112 def translate_schema(self, response):
113 for field in response["schema"]["fields"]:
114 field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
116 def run_query(self, query):
117 if not BIGQUERY_AVAILABLE:
118 print >> sys.stderr, "bigquery_analytics: bigquery flow is not available. returning empty result."
121 response = self.run_query_raw(query)
124 for field in response["schema"]["fields"]:
125 fieldNames.append(field["name"])
128 if "rows" in response:
129 for row in response["rows"]:
131 for (i,column) in enumerate(row["f"]):
132 this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
133 result.append(this_result)
137 """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
138 are all used for postprocessing queries. The idea is to do one query that
139 includes the ungrouped and unfiltered data, and cache it for multiple
140 consumers who will filter and group it as necessary.
142 TODO: Find a more generalized source for these sorts operations. Perhaps
143 put the results in SQLite and then run SQL queries against it.
146 def filter_results(self, rows, name, value):
147 result = [row for row in rows if row.get(name)==value]
150 def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
153 groupby_key = [row.get(k, None) for k in groupBy]
155 if str(groupby_key) not in new_rows:
158 new_row[k] = row.get(k, None)
160 new_rows[str(groupby_key)] = new_row
162 new_row = new_rows[str(groupby_key)]
165 new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
168 new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
169 new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
172 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
176 dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
180 #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
182 for row in new_rows.values():
184 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
185 del row["avg_base_" + k]
188 new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
190 return new_rows.values()
192 def do_computed_fields(self, rows, computed=[]):
193 computedFieldNames=[]
198 computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
200 row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
204 if computedFieldName not in computedFieldNames:
205 computedFieldNames.append(computedFieldName)
206 return (computedFieldNames, rows)
208 def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
209 sum = [x.replace("%","") for x in sum]
210 count = [x.replace("%","") for x in count]
211 avg = [x.replace("%","") for x in avg]
212 computed = [x.replace("%","") for x in computed]
213 maxi = [x.replace("%","") for x in maxi]
214 groupBy = [x.replace("%","") for x in groupBy]
216 for (k,v) in filter.items():
217 rows = self.filter_results(rows, k, v)
220 if maxDeltaTime is not None:
221 maxTime = max([float(row["time"]) for row in rows])
222 rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
224 (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
225 sum = sum + computedFieldNames
227 rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
230 def remap(self, match):
231 if not self.tableName in mappings:
232 raise MappingException("no mapping for table %s" % self.tableName)
234 mapping = mappings[self.tableName]
236 token = match.group()[1:]
238 return mapping[token]
240 raise MappingException('unknown token %s' % token)
242 def dump_table(self, rows, keys=None):
244 keys = rows[0].keys()
252 thislen = len(str(row.get(key,"")))
253 lens[key] = max(lens.get(key,0), thislen)
256 print "%*s" % (lens[key], key),
261 print "%*s" % (lens[key], str(row.get(key,""))),
264 def schema_to_cols(self, schema):
265 fields = schema["fields"]
267 colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
272 col = {"type": colTypes[field["type"]],
274 "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
281 bq = BigQueryAnalytics()
283 rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
287 if __name__ == "__main__":