12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
27 PROJECT_NUMBER = '549187599759'
30 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
31 scope='https://www.googleapis.com/auth/bigquery')
33 print "exception while initializing bigquery flow"
42 self.mapping = json.loads(self.fetch_mapping(table="demoevents"))
43 self.reverse_mapping = {v:k for k, v in self.mapping.items()}
45 def fetch_mapping(self, m=0, table="events"):
46 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
47 resp = requests.get(req)
48 if (resp.status_code==200):
51 raise Exception('Error accessing register allocations: %d'%resp.status_code)
53 def run_query_old(self, query):
54 req = 'http://cloud-scrutiny.appspot.com/command?action=send_query&q=%s' % urllib.quote(query)
55 resp = requests.get(req)
56 if (resp.status_code==200):
59 raise Exception('Error running query: %d'%resp.status_code)
62 def run_query(self, query):
63 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
64 credentials = storage.get()
66 if credentials is None or credentials.invalid:
67 credentials = run(FLOW, storage)
69 http = httplib2.Http()
70 http = credentials.authorize(http)
72 service = build('bigquery', 'v2', http=http)
74 body = {"query": query}
75 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
78 for field in response["schema"]["fields"]:
79 fieldNames.append(field["name"])
82 if "rows" in response:
83 for row in response["rows"]:
85 for (i,column) in enumerate(row["f"]):
86 this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
87 result.append(this_result)
91 def remap(self, match):
92 token = match.group()[1:]
93 if token in self.mapping:
94 return self.mapping[token]
96 raise Exception('unknown token %s' % token)
98 def get_usage(self, cp=None, hostname=None, site=None, slice=None, timeStart=-HOUR_MS, timeStop=-1, groupBy=["%hostname", "%cp"]):
100 if slice is not None:
101 where.append("%slice='" + slice + "'")
103 where.append("%cp='" + cp + "'")
104 if hostname is not None:
105 where.append("%hostname='" + hostname + "'")
107 where.append("%hostname contains " + site)
108 where.append("%bytes_sent>0")
109 where = "WHERE " + " AND ".join(where)
111 if timeStart is not None:
112 tableName = "[vicci.demoevents@%d-%d]" % (timeStart,timeStop)
114 tableName = "[vicci.demoevents]"
116 query = "SELECT %hostname,%cp,sum(%bytes_sent) as sum_bytes_sent,sum(%bytes_hit) as sum_bytes_hit, AVG(%bandwidth) as avg_bandwidth," + \
117 " MAX(TIMESTAMP_TO_MSEC(time))-MIN(TIMESTAMP_TO_MSEC(time)) as time_delta FROM " + \
118 tableName + " " + where
121 query = query + " GROUP BY " + ",".join(groupBy)
123 p = re.compile('%[a-zA-z_]*')
124 query = p.sub(self.remap, query)
126 rows = self.run_query(query)
129 row["sum_bytes_sent"] = int(row.get("sum_bytes_sent",0))
130 row["sum_bytes_hit"] = int(row.get("sum_bytes_hit",0))
131 row["avg_bandwidth"] = int(float(row.get("avg_bandwidth",0)))
132 row["time_delta"] = float(row.get("time_delta",0.0))/1000.0
134 elapsed = (timeStop-timeStart)/1000
135 KBps = int(row.get("sum_bytes_sent",0)) / elapsed / 1024
140 def sites_from_usage(self, rows, nodes_to_sites={}):
143 hostname = row["hostname"]
145 if hostname in nodes_to_sites:
146 site_name = nodes_to_sites[hostname]
148 parts = hostname.split(".")
153 if not (site_name in sites):
155 row["site"] = site_name
156 row["max_avg_bandwidth"] = row["avg_bandwidth"]
157 # sites table doesn't care about hostnames or avg_bandwidth
159 del row["avg_bandwidth"]
160 sites[site_name] = row
162 site_row = sites[site_name]
163 site_row["sum_bytes_sent"] = site_row["sum_bytes_sent"] + row["sum_bytes_sent"]
164 site_row["sum_bytes_hit"] = site_row["sum_bytes_hit"] + row["sum_bytes_hit"]
165 site_row["max_avg_bandwidth"] = max(site_row["max_avg_bandwidth"], row["avg_bandwidth"])
166 site_row["time_delta"] = max(site_row["time_delta"], row["time_delta"])
168 return sites.values()
170 def get_usage_sites(self, cp=None, slice=None, timeStart=-HOUR_MS, timeStop=-1):
171 rows = self.get_usage(cp=cp, slice=slice, timeStart=timeStart, timeStop=timeStop)
173 return self.sites_from_usage(rows)
175 def dump_table(self, rows, keys=None):
177 keys = rows[0].keys()
185 thislen = len(str(row.get(key,"")))
186 lens[key] = max(lens.get(key,0), thislen)
189 print "%*s" % (lens[key], key),
194 print "%*s" % (lens[key], str(row.get(key,""))),
197 class HpcQueryThread(HpcQuery, threading.Thread):
198 def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None, nodes_to_sites={}):
199 threading.Thread.__init__(self)
200 HpcQuery.__init__(self)
202 self.interval = interval
203 self.timeStart = timeStart
204 self.nodes_to_sites = nodes_to_sites
207 self.data_version = 0
208 self.please_die = False
209 self.update_time = time.time()
212 def is_stalled(self):
213 if time.time()-self.update_time > 300:
219 while not self.please_die:
221 self.rows = self.get_usage(timeStart=self.timeStart, cp=self.cp, slice=self.slice)
222 self.site_rows = self.sites_from_usage(self.rows, self.nodes_to_sites)
223 self.update_time = time.time()
225 self.data_version += 1
227 file("/tmp/hpcquery_fail.txt","a").write(traceback.format_exc() + "\n")
228 time.sleep(self.interval)
233 class HpcDumpThread(HpcQueryThread):
234 def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None):
235 HpcQueryThread.__init__(self, interval, slice, timeStart, cp)
240 print "update %d, data for last %d minutes" % (self.data_version, -self.timeStart/1000/60)
243 self.dump_table(self.rows, ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
245 self.dump_table(self.site_rows, ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
254 hq.dump_table(hq.get_usage(timeStart=-MINUTE_MS*5), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
256 hq.dump_table(hq.get_usage_sites(timeStart=-MINUTE_MS*5), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
260 hq.dump_table(hq.get_usage(), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
262 hq.dump_table(hq.get_usage_sites(), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
266 hq.dump_table(hq.get_usage(timeStart=-HOUR_MS*24), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
267 hq.dump_table(hq.get_usage_sites(timeStart=-HOUR_MS*24), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
275 if __name__ == "__main__":