12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
27 PROJECT_NUMBER = '549187599759'
29 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30 scope='https://www.googleapis.com/auth/bigquery')
37 self.mapping = json.loads(self.fetch_mapping(table="demoevents"))
38 self.reverse_mapping = {v:k for k, v in self.mapping.items()}
40 def fetch_mapping(self, m=0, table="events"):
41 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
42 resp = requests.get(req)
43 if (resp.status_code==200):
46 raise Exception('Error accessing register allocations: %d'%resp.status_code)
48 def run_query_old(self, query):
49 req = 'http://cloud-scrutiny.appspot.com/command?action=send_query&q=%s' % urllib.quote(query)
50 resp = requests.get(req)
51 if (resp.status_code==200):
54 raise Exception('Error running query: %d'%resp.status_code)
57 def run_query(self, query):
58 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
59 credentials = storage.get()
61 if credentials is None or credentials.invalid:
62 credentials = run(FLOW, storage)
64 http = httplib2.Http()
65 http = credentials.authorize(http)
67 service = build('bigquery', 'v2', http=http)
69 body = {"query": query}
70 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
73 for field in response["schema"]["fields"]:
74 fieldNames.append(field["name"])
77 if "rows" in response:
78 for row in response["rows"]:
80 for (i,column) in enumerate(row["f"]):
81 this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
82 result.append(this_result)
86 def remap(self, match):
87 token = match.group()[1:]
88 if token in self.mapping:
89 return self.mapping[token]
91 raise Exception('unknown token %s' % token)
93 def get_usage(self, cp=None, hostname=None, site=None, slice=None, timeStart=-HOUR_MS, timeStop=-1, groupBy=["%hostname", "%cp"]):
96 where.append("%slice='" + slice + "'")
98 where.append("%cp='" + cp + "'")
99 if hostname is not None:
100 where.append("%hostname='" + hostname + "'")
102 where.append("%hostname contains " + site)
103 where.append("%bytes_sent>0")
104 where = "WHERE " + " AND ".join(where)
106 if timeStart is not None:
107 tableName = "[vicci.demoevents@%d-%d]" % (timeStart,timeStop)
109 tableName = "[vicci.demoevents]"
111 query = "SELECT %hostname,%cp,sum(%bytes_sent) as sum_bytes_sent,sum(%bytes_hit) as sum_bytes_hit, AVG(%bandwidth) as avg_bandwidth," + \
112 " MAX(TIMESTAMP_TO_MSEC(time))-MIN(TIMESTAMP_TO_MSEC(time)) as time_delta FROM " + \
113 tableName + " " + where
116 query = query + " GROUP BY " + ",".join(groupBy)
118 p = re.compile('%[a-zA-z_]*')
119 query = p.sub(self.remap, query)
121 rows = self.run_query(query)
124 row["sum_bytes_sent"] = int(row.get("sum_bytes_sent",0))
125 row["sum_bytes_hit"] = int(row.get("sum_bytes_hit",0))
126 row["avg_bandwidth"] = int(float(row.get("avg_bandwidth",0)))
127 row["time_delta"] = float(row.get("time_delta",0.0))/1000.0
129 elapsed = (timeStop-timeStart)/1000
130 KBps = int(row.get("sum_bytes_sent",0)) / elapsed / 1024
135 def sites_from_usage(self, rows, nodes_to_sites={}):
138 hostname = row["hostname"]
140 if hostname in nodes_to_sites:
141 site_name = nodes_to_sites[hostname]
143 parts = hostname.split(".")
148 if not (site_name in sites):
150 row["site"] = site_name
151 row["max_avg_bandwidth"] = row["avg_bandwidth"]
152 # sites table doesn't care about hostnames or avg_bandwidth
154 del row["avg_bandwidth"]
155 sites[site_name] = row
157 site_row = sites[site_name]
158 site_row["sum_bytes_sent"] = site_row["sum_bytes_sent"] + row["sum_bytes_sent"]
159 site_row["sum_bytes_hit"] = site_row["sum_bytes_hit"] + row["sum_bytes_hit"]
160 site_row["max_avg_bandwidth"] = max(site_row["max_avg_bandwidth"], row["avg_bandwidth"])
161 site_row["time_delta"] = max(site_row["time_delta"], row["time_delta"])
163 return sites.values()
165 def get_usage_sites(self, cp=None, slice=None, timeStart=-HOUR_MS, timeStop=-1):
166 rows = self.get_usage(cp=cp, slice=slice, timeStart=timeStart, timeStop=timeStop)
168 return self.sites_from_usage(rows)
170 def dump_table(self, rows, keys=None):
172 keys = rows[0].keys()
180 thislen = len(str(row.get(key,"")))
181 lens[key] = max(lens.get(key,0), thislen)
184 print "%*s" % (lens[key], key),
189 print "%*s" % (lens[key], str(row.get(key,""))),
192 class HpcQueryThread(HpcQuery, threading.Thread):
193 def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None, nodes_to_sites={}):
194 threading.Thread.__init__(self)
195 HpcQuery.__init__(self)
197 self.interval = interval
198 self.timeStart = timeStart
199 self.nodes_to_sites = nodes_to_sites
202 self.data_version = 0
203 self.please_die = False
204 self.update_time = time.time()
207 def is_stalled(self):
208 if time.time()-self.update_time > 300:
214 while not self.please_die:
216 self.rows = self.get_usage(timeStart=self.timeStart, cp=self.cp, slice=self.slice)
217 self.site_rows = self.sites_from_usage(self.rows, self.nodes_to_sites)
218 self.update_time = time.time()
220 self.data_version += 1
222 file("/tmp/hpcquery_fail.txt","a").write(traceback.format_exc() + "\n")
223 time.sleep(self.interval)
228 class HpcDumpThread(HpcQueryThread):
229 def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None):
230 HpcQueryThread.__init__(self, interval, slice, timeStart, cp)
235 print "update %d, data for last %d minutes" % (self.data_version, -self.timeStart/1000/60)
238 self.dump_table(self.rows, ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
240 self.dump_table(self.site_rows, ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
249 hq.dump_table(hq.get_usage(timeStart=-MINUTE_MS*5), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
251 hq.dump_table(hq.get_usage_sites(timeStart=-MINUTE_MS*5), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
255 hq.dump_table(hq.get_usage(), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
257 hq.dump_table(hq.get_usage_sites(), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
261 hq.dump_table(hq.get_usage(timeStart=-HOUR_MS*24), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
262 hq.dump_table(hq.get_usage_sites(timeStart=-HOUR_MS*24), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
270 if __name__ == "__main__":