12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
26 PROJECT_NUMBER = '549187599759'
29 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30 scope='https://www.googleapis.com/auth/bigquery')
32 print "exception while initializing bigquery flow"
39 # global to hold cached mappings
52 class MappingException(Exception):
55 class BigQueryAnalytics:
56 def __init__(self, table = "demoevents"):
57 self.projectName = "vicci"
58 self.tableName = table
60 def reload_mapping(self):
61 global mappings, reverse_mappings
62 mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
63 reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
65 def fetch_mapping(self, m=0, table="events"):
66 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
67 resp = requests.get(req)
68 if (resp.status_code==200):
71 raise Exception('Error accessing register allocations: %d'%resp.status_code)
73 def run_query_raw(self, query):
74 p = re.compile('%[a-zA-z_]*')
77 query = p.sub(self.remap, query)
78 except MappingException:
80 query = p.sub(self.remap, query)
82 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
83 credentials = storage.get()
85 if credentials is None or credentials.invalid:
86 credentials = run(FLOW, storage)
88 http = httplib2.Http()
89 http = credentials.authorize(http)
91 service = build('bigquery', 'v2', http=http)
93 body = {"query": query,
95 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
99 def translate_schema(self, response):
100 for field in response["schema"]["fields"]:
101 field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
103 def run_query(self, query):
104 response = self.run_query_raw(query)
107 for field in response["schema"]["fields"]:
108 fieldNames.append(field["name"])
111 if "rows" in response:
112 for row in response["rows"]:
114 for (i,column) in enumerate(row["f"]):
115 this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
116 result.append(this_result)
120 """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
121 are all used for postprocessing queries. The idea is to do one query that
122 includes the ungrouped and unfiltered data, and cache it for multiple
123 consumers who will filter and group it as necessary.
125 TODO: Find a more generalized source for these sorts operations. Perhaps
126 put the results in SQLite and then run SQL queries against it.
129 def filter_results(self, rows, name, value):
130 result = [row for row in rows if row.get(name)==value]
133 def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
136 groupby_key = [row.get(k, None) for k in groupBy]
138 if str(groupby_key) not in new_rows:
141 new_row[k] = row.get(k, None)
143 new_rows[str(groupby_key)] = new_row
145 new_row = new_rows[str(groupby_key)]
148 new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
151 new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
152 new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
155 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
158 new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
160 for row in new_rows.values():
162 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
163 del row["avg_base_" + k]
165 return new_rows.values()
167 def do_computed_fields(self, rows, computed=[]):
168 computedFieldNames=[]
173 computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
175 row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
179 if computedFieldName not in computedFieldNames:
180 computedFieldNames.append(computedFieldName)
181 return (computedFieldNames, rows)
183 def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
184 sum = [x.replace("%","") for x in sum]
185 count = [x.replace("%","") for x in count]
186 avg = [x.replace("%","") for x in avg]
187 computed = [x.replace("%","") for x in computed]
188 maxi = [x.replace("%","") for x in maxi]
190 for (k,v) in filter.items():
191 rows = self.filter_results(rows, k, v)
193 if maxDeltaTime is not None:
194 maxTime = max([float(row["time"]) for row in rows])
195 rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
197 (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
198 sum = sum + computedFieldNames
199 rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
202 def remap(self, match):
203 if not self.tableName in mappings:
204 raise MappingException("no mapping for table %s" % self.tableName)
206 mapping = mappings[self.tableName]
208 token = match.group()[1:]
210 return mapping[token]
212 raise MappingException('unknown token %s' % token)
214 def dump_table(self, rows, keys=None):
216 keys = rows[0].keys()
224 thislen = len(str(row.get(key,"")))
225 lens[key] = max(lens.get(key,0), thislen)
228 print "%*s" % (lens[key], key),
233 print "%*s" % (lens[key], str(row.get(key,""))),
236 def schema_to_cols(self, schema):
237 fields = schema["fields"]
239 colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
244 col = {"type": colTypes[field["type"]],
246 "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
253 bq = BigQueryAnalytics()
255 rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
259 if __name__ == "__main__":