fix python error when no rows in query
[plstackapi.git] / planetstack / hpc_wizard / planetstack_analytics.py
1 from bigquery_analytics import BigQueryAnalytics
2 import datetime
3 import re
4 import os
5 import sys
6 import time
7 import json
8 import traceback
9 import urllib2
10
11 if os.path.exists("/home/smbaker/projects/vicci/plstackapi/planetstack"):
12     sys.path.append("/home/smbaker/projects/vicci/plstackapi/planetstack")
13 else:
14     sys.path.append("/opt/planetstack")
15
16 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
17 from django import db
18 from django.db import connection
19 from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service
20
21 BLUE_LOAD=5000000
22 RED_LOAD=15000000
23
24 glo_cached_queries = {}
25
26 class PlanetStackAnalytics(BigQueryAnalytics):
27     def __init__(self, tableName="demoevents"):
28         BigQueryAnalytics.__init__(self, tableName)
29
30     def service_to_sliceNames(self, serviceName):
31         service=Service.objects.get(name=serviceName)
32         try:
33             slices = service.slices.all()
34         except:
35             # BUG in data model -- Slice.service has related name 'service' and
36             #                      it should be 'slices'
37             slices = service.service.all()
38
39         return [slice.name for slice in slices]
40
41     def compose_query(self, slice=None, site=None, node=None, service=None, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName="demoevents", latest=False):
42         tablePart = "[%s.%s@-3600000--1]" % ("vicci", tableName)
43
44         fields = []
45         fieldNames = []
46         srcFieldNames = ["time"]
47
48         fields.append("SEC_TO_TIMESTAMP(INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s) as Time" % (str(timeBucket),str(timeBucket)))
49         #fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s as Time" % (str(timeBucket),str(timeBucket)))
50
51         for fieldName in avg:
52             fields.append("AVG(%s) as avg_%s" % (fieldName, fieldName.replace("%","")))
53             fieldNames.append("avg_%s" % fieldName.replace("%",""))
54             srcFieldNames.append(fieldName)
55
56         for fieldName in sum:
57             fields.append("SUM(%s) as sum_%s" % (fieldName, fieldName.replace("%","")))
58             fieldNames.append("sum_%s" % fieldName.replace("%",""))
59             srcFieldNames.append(fieldName)
60
61         for fieldName in count:
62             fields.append("COUNT(distinct %s) as count_%s" % (fieldName, fieldName.replace("%","")))
63             fieldNames.append("count_%s" % fieldName.replace("%",""))
64             srcFieldNames.append(fieldName)
65
66         for fieldName in val:
67             fields.append(fieldName)
68             fieldNames.append(fieldName)
69             srcFieldNames.append(fieldName)
70
71         for fieldName in computed:
72             operator = "/"
73             parts = fieldName.split("/")
74             computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
75             if len(parts)==1:
76                 operator = "*"
77                 parts = computed.split("*")
78                 computedFieldName = "computed_" + parts[0].replace("%","")+"_mult_"+parts[1].replace("%","")
79             fields.append("SUM(%s)%sSUM(%s) as %s" % (parts[0], operator, parts[1], computedFieldName))
80             fieldNames.append(computedFieldName)
81             srcFieldNames.append(parts[0])
82             srcFieldNames.append(parts[1])
83
84         for fieldName in groupBy:
85             if (fieldName not in ["Time"]):
86                 fields.append(fieldName)
87                 fieldNames.append(fieldName)
88                 srcFieldNames.append(fieldName)
89
90         fields = ", ".join(fields)
91
92         where = []
93
94         if slice:
95             where.append("%%slice='%s'" % slice)
96         if site:
97             where.append("%%site='%s'" % site)
98         if node:
99             where.append("%%hostname='%s'" % node)
100         if service:
101             sliceNames = self.service_to_sliceNames(service)
102             if sliceNames:
103                 where.append("(" + " OR ".join(["%%slice='%s'" % sliceName for sliceName in sliceNames]) +")")
104
105         if where:
106             where = " WHERE " + " AND ".join(where)
107         else:
108             where =""
109
110         if groupBy:
111             groupBySub = " GROUP BY " + ",".join(groupBy + ["%hostname"])
112             groupBy = " GROUP BY " + ",".join(groupBy)
113         else:
114             groupBySub = " GROUP BY %hostname"
115             groupBy = ""
116
117         if orderBy:
118             orderBy = " ORDER BY " + ",".join(orderBy)
119         else:
120             orderBy = ""
121
122         if latest:
123             latestFields = ["table1.%s as %s" % (x,x) for x in srcFieldNames]
124             latestFields = ", ".join(latestFields)
125             tablePart = """(SELECT %s FROM %s AS table1
126                             JOIN
127                                 (SELECT %%hostname, event, max(time) as maxtime from %s GROUP BY %%hostname, event) AS latest
128                             ON
129                                 table1.%%hostname = latest.%%hostname AND table1.event = latest.event AND table1.time = latest.maxtime)""" % (latestFields, tablePart, tablePart)
130
131         if computed:
132             subQuery = "SELECT %%hostname, %s FROM %s" % (fields, tablePart)
133             if where:
134                 subQuery = subQuery + where
135             subQuery = subQuery + groupBySub
136
137             sumFields = []
138             for fieldName in fieldNames:
139                 if fieldName.startswith("avg"):
140                     sumFields.append("AVG(%s) as avg_%s"%(fieldName,fieldName))
141                     sumFields.append("MAX(%s) as max_%s"%(fieldName,fieldName))
142                 elif (fieldName.startswith("count")) or (fieldName.startswith("sum")) or (fieldName.startswith("computed")):
143                     sumFields.append("SUM(%s) as sum_%s"%(fieldName,fieldName))
144                 else:
145                     sumFields.append(fieldName)
146
147             sumFields = ",".join(sumFields)
148
149             query = "SELECT %s, %s FROM (%s)" % ("Time", sumFields, subQuery)
150             if groupBy:
151                 query = query + groupBy
152             if orderBy:
153                 query = query + orderBy
154         else:
155             query = "SELECT %s FROM %s" % (fields, tablePart)
156             if where:
157                 query = query + " " + where
158             if groupBy:
159                 query = query + groupBy
160             if orderBy:
161                 query = query + orderBy
162
163         return query
164
165     def get_list_from_req(self, req, name, default=[]):
166         value = req.GET.get(name, None)
167         if not value:
168             return default
169         value=value.replace("@","%")
170         return value.split(",")
171
172     def format_result(self, format, result, query, dataSourceUrl):
173         if (format == "json_dicts"):
174             result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl}
175             return ("application/javascript", json.dumps(result))
176
177         elif (format == "json_arrays"):
178             new_result = []
179             for row in result:
180                 new_row = []
181                 for key in sorted(row.keys()):
182                     new_row.append(row[key])
183                 new_result.append(new_row)
184                 new_result = {"query": query, "rows": new_result}
185             return ("application/javascript", json.dumps(new_result))
186
187         elif (format == "html_table"):
188             new_rows = []
189             for row in result:
190                 new_row = []
191                 for key in sorted(row.keys()):
192                     new_row.append("<TD>%s</TD>" % str(row[key]))
193                 new_rows.append("<TR>%s</TR>" % "".join(new_row))
194
195             new_result = "<TABLE>%s</TABLE>" % "\n".join(new_rows)
196
197             return ("text/html", new_result)
198
199     def merge_datamodel_sites(self, rows):
200         """ For a query that included "site" in its groupby, merge in the
201             opencloud site information.
202         """
203         for row in rows:
204             sitename = row["site"]
205             try:
206                 model_site = Site.objects.get(name=sitename)
207             except:
208                 # we didn't find it in the data model
209                 continue
210
211             row["lat"] = float(model_site.location.latitude)
212             row["long"] = float(model_site.location.longitude)
213             row["url"] = model_site.site_url
214             row["numNodes"] = model_site.nodes.count()
215
216             max_cpu = row.get("max_avg_cpu", row.get("max_cpu",0))
217             cpu=float(max_cpu)/100.0
218             row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD))
219
220     def compose_latest_query(self, fieldNames=None, groupByFields=["%hostname", "event"]):
221         """ Compose a query that returns the 'most recent' row for each (hostname, event)
222             pair.
223         """
224
225         if not fieldNames:
226             fieldNames = ["%hostname", "%bytes_sent", "time", "event", "%site", "%elapsed", "%slice", "%cpu"]
227
228         fields = ["table1.%s AS %s" % (x,x) for x in fieldNames]
229         fields = ", ".join(fields)
230
231         tableDesc = "%s.%s" % (self.projectName, self.tableName)
232
233         groupByOn = ["table1.time = latest.maxtime"]
234         for field in groupByFields:
235             groupByOn.append("table1.%s = latest.%s" % (field, field))
236
237         groupByOn = " AND ".join(groupByOn)
238         groupByFields = ", ".join(groupByFields)
239
240         base_query = "SELECT %s FROM [%s@-3600000--1] AS table1 JOIN (SELECT %s, max(time) as maxtime from [%s@-3600000--1] GROUP BY %s) AS latest ON %s" % \
241                       (fields, tableDesc, groupByFields, tableDesc, groupByFields, groupByOn)
242
243         return base_query
244
245     def get_cached_query_results(self, q):
246         global glo_cached_queries
247
248         if q in glo_cached_queries:
249             if (time.time() - glo_cached_queries[q]["time"]) <= 60:
250                 print "using cached query"
251                 return glo_cached_queries[q]["rows"]
252
253         print "refreshing cached query"
254         result = self.run_query(q)
255         glo_cached_queries[q] = {"time": time.time(), "rows": result}
256
257         return result
258
259     def process_request(self, req):
260         print req.GET
261
262         tqx = req.GET.get("tqx", None)
263
264         slice = req.GET.get("slice", None)
265         site = req.GET.get("site", None)
266         node = req.GET.get("node", None)
267         service = req.GET.get("service", None)
268
269         format = req.GET.get("format", "json_dicts")
270
271         timeField = req.GET.get("timeBucket", "60")
272         avg = self.get_list_from_req(req, "avg")
273         sum = self.get_list_from_req(req, "sum")
274         count = self.get_list_from_req(req, "count")
275         computed = self.get_list_from_req(req, "computed")
276         groupBy = self.get_list_from_req(req, "groupBy", ["Time"])
277         orderBy = self.get_list_from_req(req, "orderBy", ["Time"])
278
279         maxRows = req.GET.get("maxRows", None)
280         mergeDataModelSites = req.GET.get("mergeDataModelSites", None)
281
282         cached = req.GET.get("cached", None)
283
284         q = self.compose_query(slice, site, node, service, timeField, avg, sum, count, computed, [], groupBy, orderBy)
285
286         print q
287
288         dataSourceUrl = "http://" + req.META["SERVER_NAME"] + ":" + req.META["SERVER_PORT"] + req.META["PATH_INFO"] + "?" + req.META["QUERY_STRING"].replace("format=","origFormat=").replace("%","%25") + "&format=charts";
289
290         if (format=="dataSourceUrl"):
291             result = {"dataSourceUrl": dataSourceUrl}
292             return ("application/javascript", result)
293
294         elif (format=="raw"):
295             result = self.run_query_raw(q)
296             result["dataSourceUrl"] = dataSourceUrl
297
298             result = json.dumps(result);
299
300             return ("application/javascript", result)
301
302         elif (format=="nodata"):
303             result = {"dataSourceUrl": dataSourceUrl, "query": q}
304             result = json.dumps(result);
305             return {"application/javascript", result}
306
307         elif (format=="charts"):
308             bq_result = self.run_query_raw(q)
309
310             # cloudscrutiny code is probably better!
311             table = {}
312             table["cols"] = self.schema_to_cols(bq_result["schema"])
313             rows = []
314             if "rows" in bq_result:
315                 for row in bq_result["rows"]:
316                     rowcols = []
317                     for (colnum,col) in enumerate(row["f"]):
318                         if (colnum==0):
319                             dt = datetime.datetime.fromtimestamp(float(col["v"]))
320                             rowcols.append({"v": 'new Date("%s")' % dt.isoformat()})
321                         else:
322                             try:
323                                 rowcols.append({"v": float(col["v"])})
324                             except:
325                                 rowcols.append({"v": col["v"]})
326                     rows.append({"c": rowcols})
327             table["rows"] = rows
328
329             if tqx:
330                 reqId = tqx.strip("reqId:")
331             else:
332                 reqId = "0"
333
334             result = {"status": "okColumnChart", "reqId": reqId, "table": table, "version": "0.6"}
335
336             result = "google.visualization.Query.setResponse(" + json.dumps(result) + ");"
337
338             def unquote_it(x): return x.group()[1:-1].replace('\\"', '"')
339
340             p = re.compile(r'"new Date\(\\"[^"]*\\"\)"')
341             result=p.sub(unquote_it, result)
342
343             return ("application/javascript", result)
344
345         else:
346             if cached:
347                 results = self.get_cached_query_results(self.compose_latest_query())
348
349                 filter={}
350                 if slice:
351                     filter["slice"] = slice
352                 if site:
353                     filter["site"] = site
354                 if node:
355                     filter["hostname"] = node
356
357                 result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=["doesnotexist"])
358             else:
359                 result = self.run_query(q)
360
361             if maxRows:
362                 result = result[-int(maxRows):]
363
364             if mergeDataModelSites:
365                 self.merge_datamodel_sites(result)
366
367             return self.format_result(format, result, q, dataSourceUrl)
368
369 def DoPlanetStackAnalytics(request):
370     bq = PlanetStackAnalytics()
371     result = bq.process_request(request)
372
373     return result
374
375 def main():
376     bq = PlanetStackAnalytics()
377
378     q = bq.compose_latest_query()
379     results = bq.run_query(q)
380
381     results = bq.postprocess_results(results,
382                                      #filter={"site": "Princeton"},
383                                      groupBy=["site"],
384                                      computed=["bytes_sent/elapsed"],
385                                      sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"],
386                                      maxDeltaTime=60)
387
388     bq.dump_table(results)
389
390     q=bq.compose_query(sum=["%bytes_sent"], avg=["%cpu"], latest=True, groupBy=["Time", "%site"])
391     print q
392     bq.dump_table(bq.run_query(q))
393
394     sys.exit(0)
395
396     q=bq.compose_query(avg=["%cpu","%bandwidth"], count=["%hostname"], slice="HyperCache")
397     print q
398     bq.dump_table(bq.run_query(q))
399
400     q=bq.compose_query(computed=["%bytes_sent/%elapsed"])
401     print
402     print q
403     bq.dump_table(bq.run_query(q))
404
405     q=bq.compose_query(timeBucket=60*60, avg=["%cpu"], count=["%hostname"], computed=["%bytes_sent/%elapsed"])
406     print
407     print q
408     bq.dump_table(bq.run_query(q))
409
410 if __name__ == "__main__":
411     main()
412
413
414
415
416