check in hpc_wizard and analytics python source
[plstackapi.git] / planetstack / hpc_wizard / query.py
1 import re
2 import base64
3 import requests
4 import urllib
5 import json
6 import httplib2
7 import threading
8 import os
9 import time
10 import traceback
11
12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
19
20 """
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
24 """
25
26
27 PROJECT_NUMBER = '549187599759'
28
29 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30                                scope='https://www.googleapis.com/auth/bigquery')
31
32 MINUTE_MS = 60*1000
33 HOUR_MS = 60*60*1000
34
35 class HpcQuery:
36     def __init__(self):
37         self.mapping = json.loads(self.fetch_mapping(table="demoevents"))
38         self.reverse_mapping = {v:k for k, v in self.mapping.items()}
39
40     def fetch_mapping(self, m=0, table="events"):
41         req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
42         resp = requests.get(req)
43         if (resp.status_code==200):
44                 return resp.text
45         else:
46                 raise Exception('Error accessing register allocations: %d'%resp.status_code)
47
48     def run_query_old(self, query):
49         req = 'http://cloud-scrutiny.appspot.com/command?action=send_query&q=%s' % urllib.quote(query)
50         resp = requests.get(req)
51         if (resp.status_code==200):
52                 return resp.text
53         else:
54                 raise Exception('Error running query: %d'%resp.status_code)
55         return resp
56
57     def run_query(self, query):
58         storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
59         credentials = storage.get()
60
61         if credentials is None or credentials.invalid:
62                 credentials = run(FLOW, storage)
63
64         http = httplib2.Http()
65         http = credentials.authorize(http)
66
67         service = build('bigquery', 'v2', http=http)
68
69         body = {"query": query}
70         response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
71
72         fieldNames = []
73         for field in response["schema"]["fields"]:
74             fieldNames.append(field["name"])
75
76         result = []
77         if "rows" in response:
78             for row in response["rows"]:
79                 this_result = {}
80                 for (i,column) in enumerate(row["f"]):
81                     this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
82                 result.append(this_result)
83
84         return result
85
86     def remap(self, match):
87         token = match.group()[1:]
88         if token in self.mapping:
89             return self.mapping[token]
90         else:
91             raise Exception('unknown token %s' % token)
92
93     def get_usage(self, cp=None, hostname=None, site=None, slice=None, timeStart=-HOUR_MS, timeStop=-1, groupBy=["%hostname", "%cp"]):
94         where = []
95         if slice is not None:
96             where.append("%slice='" + slice + "'")
97         if cp is not None:
98             where.append("%cp='" + cp + "'")
99         if hostname is not None:
100             where.append("%hostname='" + hostname + "'")
101         if site is not None:
102             where.append("%hostname contains " + site)
103         where.append("%bytes_sent>0")
104         where = "WHERE " + " AND ".join(where)
105
106         if timeStart is not None:
107              tableName = "[vicci.demoevents@%d-%d]" % (timeStart,timeStop)
108         else:
109              tableName = "[vicci.demoevents]"
110
111         query = "SELECT %hostname,%cp,sum(%bytes_sent) as sum_bytes_sent,sum(%bytes_hit) as sum_bytes_hit, AVG(%bandwidth) as avg_bandwidth," + \
112                 " MAX(TIMESTAMP_TO_MSEC(time))-MIN(TIMESTAMP_TO_MSEC(time)) as time_delta FROM " + \
113                 tableName + " " + where
114
115         if groupBy:
116             query = query + " GROUP BY " + ",".join(groupBy)
117
118         p = re.compile('%[a-zA-z_]*')
119         query = p.sub(self.remap, query)
120
121         rows = self.run_query(query)
122
123         for row in rows:
124             row["sum_bytes_sent"] = int(row.get("sum_bytes_sent",0))
125             row["sum_bytes_hit"] = int(row.get("sum_bytes_hit",0))
126             row["avg_bandwidth"] = int(float(row.get("avg_bandwidth",0)))
127             row["time_delta"] = float(row.get("time_delta",0.0))/1000.0
128
129             elapsed = (timeStop-timeStart)/1000
130             KBps = int(row.get("sum_bytes_sent",0)) / elapsed / 1024
131             row["KBps"] = KBps
132
133         return rows
134
135     def sites_from_usage(self, rows, nodes_to_sites={}):
136         sites = {}
137         for row in rows:
138             hostname = row["hostname"]
139
140             if hostname in nodes_to_sites:
141                 site_name = nodes_to_sites[hostname]
142             else:
143                 parts = hostname.split(".")
144                 if len(parts)<=2:
145                     continue
146                 site_name = parts[1]
147
148             if not (site_name in sites):
149                 row = row.copy()
150                 row["site"] = site_name
151                 row["max_avg_bandwidth"] = row["avg_bandwidth"]
152                 # sites table doesn't care about hostnames or avg_bandwidth
153                 del row["hostname"]
154                 del row["avg_bandwidth"]
155                 sites[site_name] = row
156             else:
157                 site_row = sites[site_name]
158                 site_row["sum_bytes_sent"] = site_row["sum_bytes_sent"] + row["sum_bytes_sent"]
159                 site_row["sum_bytes_hit"] = site_row["sum_bytes_hit"] + row["sum_bytes_hit"]
160                 site_row["max_avg_bandwidth"] = max(site_row["max_avg_bandwidth"], row["avg_bandwidth"])
161                 site_row["time_delta"] = max(site_row["time_delta"], row["time_delta"])
162
163         return sites.values()
164
165     def get_usage_sites(self, cp=None, slice=None, timeStart=-HOUR_MS, timeStop=-1):
166         rows = self.get_usage(cp=cp, slice=slice, timeStart=timeStart, timeStop=timeStop)
167
168         return self.sites_from_usage(rows)
169
170     def dump_table(self, rows, keys=None):
171         if not keys:
172             keys = rows[0].keys()
173
174         lens = {}
175         for key in keys:
176             lens[key] = len(key)
177
178         for row in rows:
179             for key in keys:
180                 thislen = len(str(row.get(key,"")))
181                 lens[key] = max(lens.get(key,0), thislen)
182
183         for key in keys:
184             print "%*s" % (lens[key], key),
185         print
186
187         for row in rows:
188             for key in keys:
189                 print "%*s" % (lens[key], str(row.get(key,""))),
190             print
191
192 class HpcQueryThread(HpcQuery, threading.Thread):
193     def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None, nodes_to_sites={}):
194         threading.Thread.__init__(self)
195         HpcQuery.__init__(self)
196         self.daemon = True
197         self.interval = interval
198         self.timeStart = timeStart
199         self.nodes_to_sites = nodes_to_sites
200         self.slice = slice
201         self.cp = cp
202         self.data_version = 0
203         self.please_die = False
204         self.update_time = time.time()
205         self.start()
206
207     def is_stalled(self):
208         if time.time()-self.update_time > 300:
209             return True
210         else:
211             return False
212
213     def run(self):
214         while not self.please_die:
215             try:
216                 self.rows = self.get_usage(timeStart=self.timeStart, cp=self.cp, slice=self.slice)
217                 self.site_rows = self.sites_from_usage(self.rows, self.nodes_to_sites)
218                 self.update_time = time.time()
219                 self.new_data()
220                 self.data_version += 1
221             except:
222                 file("/tmp/hpcquery_fail.txt","a").write(traceback.format_exc() + "\n")
223             time.sleep(self.interval)
224
225     def new_data(self):
226         pass
227
228 class HpcDumpThread(HpcQueryThread):
229     def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None):
230         HpcQueryThread.__init__(self, interval, slice, timeStart, cp)
231
232     def new_data(self):
233         os.system("clear")
234
235         print "update %d, data for last %d minutes" % (self.data_version, -self.timeStart/1000/60)
236         print
237
238         self.dump_table(self.rows, ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
239         print
240         self.dump_table(self.site_rows, ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
241         print
242
243
244 def main_old():
245     hq = HpcQuery()
246 #    print hq.mapping
247
248     print "5 minute"
249     hq.dump_table(hq.get_usage(timeStart=-MINUTE_MS*5), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
250     print
251     hq.dump_table(hq.get_usage_sites(timeStart=-MINUTE_MS*5), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
252     print
253
254     print "1 hour"
255     hq.dump_table(hq.get_usage(), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
256     print
257     hq.dump_table(hq.get_usage_sites(), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
258     print
259
260     print "24 hours"
261     hq.dump_table(hq.get_usage(timeStart=-HOUR_MS*24), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
262     hq.dump_table(hq.get_usage_sites(timeStart=-HOUR_MS*24), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
263     print
264
265 def main():
266     hd = HpcDumpThread()
267     while True:
268         time.sleep(30)
269
270 if __name__ == "__main__":
271     main()