remove Dashboard Views from user admin
[plstackapi.git] / planetstack / hpc_wizard / query.py
1 import re
2 import base64
3 import requests
4 import urllib
5 import json
6 import httplib2
7 import threading
8 import os
9 import time
10 import traceback
11
12 from apiclient.discovery import build
13 from apiclient.errors import HttpError
14 from oauth2client.client import AccessTokenRefreshError
15 from oauth2client.client import OAuth2WebServerFlow
16 from oauth2client.client import flow_from_clientsecrets
17 from oauth2client.file import Storage
18 from oauth2client.tools import run_flow,run
19
20 """
21 yum -y install python-httplib2
22 easy_install python_gflags
23 easy_install google_api_python_client
24 """
25
26
27 PROJECT_NUMBER = '549187599759'
28
29 try:
30     FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
31                                    scope='https://www.googleapis.com/auth/bigquery')
32 except:
33     print "exception while initializing bigquery flow"
34     traceback.print_exc()
35     FLOW = None
36
37 MINUTE_MS = 60*1000
38 HOUR_MS = 60*60*1000
39
40 class HpcQuery:
41     def __init__(self):
42         self.mapping = json.loads(self.fetch_mapping(table="demoevents"))
43         self.reverse_mapping = {v:k for k, v in self.mapping.items()}
44
45     def fetch_mapping(self, m=0, table="events"):
46         req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
47         resp = requests.get(req)
48         if (resp.status_code==200):
49                 return resp.text
50         else:
51                 raise Exception('Error accessing register allocations: %d'%resp.status_code)
52
53     def run_query_old(self, query):
54         req = 'http://cloud-scrutiny.appspot.com/command?action=send_query&q=%s' % urllib.quote(query)
55         resp = requests.get(req)
56         if (resp.status_code==200):
57                 return resp.text
58         else:
59                 raise Exception('Error running query: %d'%resp.status_code)
60         return resp
61
62     def run_query(self, query):
63         storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
64         credentials = storage.get()
65
66         if credentials is None or credentials.invalid:
67                 credentials = run(FLOW, storage)
68
69         http = httplib2.Http()
70         http = credentials.authorize(http)
71
72         service = build('bigquery', 'v2', http=http)
73
74         body = {"query": query}
75         response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
76
77         fieldNames = []
78         for field in response["schema"]["fields"]:
79             fieldNames.append(field["name"])
80
81         result = []
82         if "rows" in response:
83             for row in response["rows"]:
84                 this_result = {}
85                 for (i,column) in enumerate(row["f"]):
86                     this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
87                 result.append(this_result)
88
89         return result
90
91     def remap(self, match):
92         token = match.group()[1:]
93         if token in self.mapping:
94             return self.mapping[token]
95         else:
96             raise Exception('unknown token %s' % token)
97
98     def get_usage(self, cp=None, hostname=None, site=None, slice=None, timeStart=-HOUR_MS, timeStop=-1, groupBy=["%hostname", "%cp"]):
99         where = []
100         if slice is not None:
101             where.append("%slice='" + slice + "'")
102         if cp is not None:
103             where.append("%cp='" + cp + "'")
104         if hostname is not None:
105             where.append("%hostname='" + hostname + "'")
106         if site is not None:
107             where.append("%hostname contains " + site)
108         where.append("%bytes_sent>0")
109         where = "WHERE " + " AND ".join(where)
110
111         if timeStart is not None:
112              tableName = "[vicci.demoevents@%d-%d]" % (timeStart,timeStop)
113         else:
114              tableName = "[vicci.demoevents]"
115
116         query = "SELECT %hostname,%cp,sum(%bytes_sent) as sum_bytes_sent,sum(%bytes_hit) as sum_bytes_hit, AVG(%bandwidth) as avg_bandwidth," + \
117                 " MAX(TIMESTAMP_TO_MSEC(time))-MIN(TIMESTAMP_TO_MSEC(time)) as time_delta FROM " + \
118                 tableName + " " + where
119
120         if groupBy:
121             query = query + " GROUP BY " + ",".join(groupBy)
122
123         p = re.compile('%[a-zA-z_]*')
124         query = p.sub(self.remap, query)
125
126         rows = self.run_query(query)
127
128         for row in rows:
129             row["sum_bytes_sent"] = int(row.get("sum_bytes_sent",0))
130             row["sum_bytes_hit"] = int(row.get("sum_bytes_hit",0))
131             row["avg_bandwidth"] = int(float(row.get("avg_bandwidth",0)))
132             row["time_delta"] = float(row.get("time_delta",0.0))/1000.0
133
134             elapsed = (timeStop-timeStart)/1000
135             KBps = int(row.get("sum_bytes_sent",0)) / elapsed / 1024
136             row["KBps"] = KBps
137
138         return rows
139
140     def sites_from_usage(self, rows, nodes_to_sites={}):
141         sites = {}
142         for row in rows:
143             hostname = row["hostname"]
144
145             if hostname in nodes_to_sites:
146                 site_name = nodes_to_sites[hostname]
147             else:
148                 parts = hostname.split(".")
149                 if len(parts)<=2:
150                     continue
151                 site_name = parts[1]
152
153             if not (site_name in sites):
154                 row = row.copy()
155                 row["site"] = site_name
156                 row["max_avg_bandwidth"] = row["avg_bandwidth"]
157                 # sites table doesn't care about hostnames or avg_bandwidth
158                 del row["hostname"]
159                 del row["avg_bandwidth"]
160                 sites[site_name] = row
161             else:
162                 site_row = sites[site_name]
163                 site_row["sum_bytes_sent"] = site_row["sum_bytes_sent"] + row["sum_bytes_sent"]
164                 site_row["sum_bytes_hit"] = site_row["sum_bytes_hit"] + row["sum_bytes_hit"]
165                 site_row["max_avg_bandwidth"] = max(site_row["max_avg_bandwidth"], row["avg_bandwidth"])
166                 site_row["time_delta"] = max(site_row["time_delta"], row["time_delta"])
167
168         return sites.values()
169
170     def get_usage_sites(self, cp=None, slice=None, timeStart=-HOUR_MS, timeStop=-1):
171         rows = self.get_usage(cp=cp, slice=slice, timeStart=timeStart, timeStop=timeStop)
172
173         return self.sites_from_usage(rows)
174
175     def dump_table(self, rows, keys=None):
176         if not keys:
177             keys = rows[0].keys()
178
179         lens = {}
180         for key in keys:
181             lens[key] = len(key)
182
183         for row in rows:
184             for key in keys:
185                 thislen = len(str(row.get(key,"")))
186                 lens[key] = max(lens.get(key,0), thislen)
187
188         for key in keys:
189             print "%*s" % (lens[key], key),
190         print
191
192         for row in rows:
193             for key in keys:
194                 print "%*s" % (lens[key], str(row.get(key,""))),
195             print
196
197 class HpcQueryThread(HpcQuery, threading.Thread):
198     def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None, nodes_to_sites={}):
199         threading.Thread.__init__(self)
200         HpcQuery.__init__(self)
201         self.daemon = True
202         self.interval = interval
203         self.timeStart = timeStart
204         self.nodes_to_sites = nodes_to_sites
205         self.slice = slice
206         self.cp = cp
207         self.data_version = 0
208         self.please_die = False
209         self.update_time = time.time()
210         self.start()
211
212     def is_stalled(self):
213         if time.time()-self.update_time > 300:
214             return True
215         else:
216             return False
217
218     def run(self):
219         while not self.please_die:
220             try:
221                 self.rows = self.get_usage(timeStart=self.timeStart, cp=self.cp, slice=self.slice)
222                 self.site_rows = self.sites_from_usage(self.rows, self.nodes_to_sites)
223                 self.update_time = time.time()
224                 self.new_data()
225                 self.data_version += 1
226             except:
227                 file("/tmp/hpcquery_fail.txt","a").write(traceback.format_exc() + "\n")
228             time.sleep(self.interval)
229
230     def new_data(self):
231         pass
232
233 class HpcDumpThread(HpcQueryThread):
234     def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None):
235         HpcQueryThread.__init__(self, interval, slice, timeStart, cp)
236
237     def new_data(self):
238         os.system("clear")
239
240         print "update %d, data for last %d minutes" % (self.data_version, -self.timeStart/1000/60)
241         print
242
243         self.dump_table(self.rows, ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
244         print
245         self.dump_table(self.site_rows, ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
246         print
247
248
249 def main_old():
250     hq = HpcQuery()
251 #    print hq.mapping
252
253     print "5 minute"
254     hq.dump_table(hq.get_usage(timeStart=-MINUTE_MS*5), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
255     print
256     hq.dump_table(hq.get_usage_sites(timeStart=-MINUTE_MS*5), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
257     print
258
259     print "1 hour"
260     hq.dump_table(hq.get_usage(), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
261     print
262     hq.dump_table(hq.get_usage_sites(), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
263     print
264
265     print "24 hours"
266     hq.dump_table(hq.get_usage(timeStart=-HOUR_MS*24), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
267     hq.dump_table(hq.get_usage_sites(timeStart=-HOUR_MS*24), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
268     print
269
270 def main():
271     hd = HpcDumpThread()
272     while True:
273         time.sleep(30)
274
275 if __name__ == "__main__":
276     main()