get table name from config file, filter by event
[plstackapi.git] / planetstack / hpc_wizard / planetstack_analytics.py
1 from bigquery_analytics import BigQueryAnalytics
2 import datetime
3 import re
4 import os
5 import sys
6 import time
7 import json
8 import traceback
9 import urllib2
10
11 if os.path.exists("/home/smbaker/projects/vicci/plstackapi/planetstack"):
12     sys.path.append("/home/smbaker/projects/vicci/plstackapi/planetstack")
13 else:
14     sys.path.append("/opt/planetstack")
15
16 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
17 from django.conf import settings
18 from django import db
19 from django.db import connection
20 from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service
21
22 BLUE_LOAD=5000000
23 RED_LOAD=15000000
24
25 glo_cached_queries = {}
26
27 class PlanetStackAnalytics(BigQueryAnalytics):
28     def __init__(self, tableName=None):
29         if not tableName:
30             tableName = settings.BIGQUERY_TABLE
31
32         BigQueryAnalytics.__init__(self, tableName)
33
34     def service_to_sliceNames(self, serviceName):
35         service=Service.objects.get(name=serviceName)
36         try:
37             slices = service.slices.all()
38         except:
39             # BUG in data model -- Slice.service has related name 'service' and
40             #                      it should be 'slices'
41             slices = service.service.all()
42
43         return [slice.name for slice in slices]
44
45     def compose_query(self, slice=None, site=None, node=None, service=None, event="libvirt_heartbeat", timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName=None, latest=False, maxAge=60*60):
46         if tableName is None:
47             tableName = self.tableName
48
49         maxAge = maxAge * 1000
50         tablePart = "[%s.%s@-%d--1]" % ("vicci", tableName, maxAge)
51
52         fields = []
53         fieldNames = []
54         srcFieldNames = ["time"]
55
56         fields.append("SEC_TO_TIMESTAMP(INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s) as Time" % (str(timeBucket),str(timeBucket)))
57         #fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s as Time" % (str(timeBucket),str(timeBucket)))
58
59         for fieldName in avg:
60             fields.append("AVG(%s) as avg_%s" % (fieldName, fieldName.replace("%","")))
61             fieldNames.append("avg_%s" % fieldName.replace("%",""))
62             srcFieldNames.append(fieldName)
63
64         for fieldName in sum:
65             fields.append("SUM(%s) as sum_%s" % (fieldName, fieldName.replace("%","")))
66             fieldNames.append("sum_%s" % fieldName.replace("%",""))
67             srcFieldNames.append(fieldName)
68
69         for fieldName in count:
70             fields.append("COUNT(distinct %s) as count_%s" % (fieldName, fieldName.replace("%","")))
71             fieldNames.append("count_%s" % fieldName.replace("%",""))
72             srcFieldNames.append(fieldName)
73
74         for fieldName in val:
75             fields.append(fieldName)
76             fieldNames.append(fieldName)
77             srcFieldNames.append(fieldName)
78
79         for fieldName in computed:
80             operator = "/"
81             parts = fieldName.split("/")
82             computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
83             if len(parts)==1:
84                 operator = "*"
85                 parts = computed.split("*")
86                 computedFieldName = "computed_" + parts[0].replace("%","")+"_mult_"+parts[1].replace("%","")
87             fields.append("SUM(%s)%sSUM(%s) as %s" % (parts[0], operator, parts[1], computedFieldName))
88             fieldNames.append(computedFieldName)
89             srcFieldNames.append(parts[0])
90             srcFieldNames.append(parts[1])
91
92         for fieldName in groupBy:
93             if (fieldName not in ["Time"]):
94                 fields.append(fieldName)
95                 fieldNames.append(fieldName)
96                 srcFieldNames.append(fieldName)
97
98         fields = ", ".join(fields)
99
100         where = []
101
102         if slice:
103             where.append("%%slice='%s'" % slice)
104         if site:
105             where.append("%%site='%s'" % site)
106         if node:
107             where.append("%%hostname='%s'" % node)
108         if event:
109             where.append("event='%s'" % event)
110         if service:
111             sliceNames = self.service_to_sliceNames(service)
112             if sliceNames:
113                 where.append("(" + " OR ".join(["%%slice='%s'" % sliceName for sliceName in sliceNames]) +")")
114
115         if where:
116             where = " WHERE " + " AND ".join(where)
117         else:
118             where =""
119
120         if groupBy:
121             groupBySub = " GROUP BY " + ",".join(groupBy + ["%hostname"])
122             groupBy = " GROUP BY " + ",".join(groupBy)
123         else:
124             groupBySub = " GROUP BY %hostname"
125             groupBy = ""
126
127         if orderBy:
128             orderBy = " ORDER BY " + ",".join(orderBy)
129         else:
130             orderBy = ""
131
132         if latest:
133             latestFields = ["table1.%s as %s" % (x,x) for x in srcFieldNames]
134             latestFields = ", ".join(latestFields)
135             tablePart = """(SELECT %s FROM %s AS table1
136                             JOIN
137                                 (SELECT %%hostname, event, max(time) as maxtime from %s GROUP BY %%hostname, event) AS latest
138                             ON
139                                 table1.%%hostname = latest.%%hostname AND table1.event = latest.event AND table1.time = latest.maxtime)""" % (latestFields, tablePart, tablePart)
140
141         if computed:
142             subQuery = "SELECT %%hostname, %s FROM %s" % (fields, tablePart)
143             if where:
144                 subQuery = subQuery + where
145             subQuery = subQuery + groupBySub
146
147             sumFields = []
148             for fieldName in fieldNames:
149                 if fieldName.startswith("avg"):
150                     sumFields.append("AVG(%s) as avg_%s"%(fieldName,fieldName))
151                     sumFields.append("MAX(%s) as max_%s"%(fieldName,fieldName))
152                 elif (fieldName.startswith("count")) or (fieldName.startswith("sum")) or (fieldName.startswith("computed")):
153                     sumFields.append("SUM(%s) as sum_%s"%(fieldName,fieldName))
154                 else:
155                     sumFields.append(fieldName)
156
157             sumFields = ",".join(sumFields)
158
159             query = "SELECT %s, %s FROM (%s)" % ("Time", sumFields, subQuery)
160             if groupBy:
161                 query = query + groupBy
162             if orderBy:
163                 query = query + orderBy
164         else:
165             query = "SELECT %s FROM %s" % (fields, tablePart)
166             if where:
167                 query = query + " " + where
168             if groupBy:
169                 query = query + groupBy
170             if orderBy:
171                 query = query + orderBy
172
173         return query
174
175     def get_list_from_req(self, req, name, default=[]):
176         value = req.GET.get(name, None)
177         if not value:
178             return default
179         value=value.replace("@","%")
180         return value.split(",")
181
182     def format_result(self, format, result, query, dataSourceUrl):
183         if (format == "json_dicts"):
184             result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl}
185             return ("application/javascript", json.dumps(result))
186
187         elif (format == "json_arrays"):
188             new_result = []
189             for row in result:
190                 new_row = []
191                 for key in sorted(row.keys()):
192                     new_row.append(row[key])
193                 new_result.append(new_row)
194                 new_result = {"query": query, "rows": new_result}
195             return ("application/javascript", json.dumps(new_result))
196
197         elif (format == "html_table"):
198             new_rows = []
199             for row in result:
200                 new_row = []
201                 for key in sorted(row.keys()):
202                     new_row.append("<TD>%s</TD>" % str(row[key]))
203                 new_rows.append("<TR>%s</TR>" % "".join(new_row))
204
205             new_result = "<TABLE>%s</TABLE>" % "\n".join(new_rows)
206
207             return ("text/html", new_result)
208
209     def merge_datamodel_sites(self, rows, slice=None):
210         """ For a query that included "site" in its groupby, merge in the
211             opencloud site information.
212         """
213
214         if slice:
215             try:
216                 slice = Slice.objects.get(name=slice)
217             except:
218                 slice = None
219
220         for row in rows:
221             sitename = row["site"]
222             try:
223                 model_site = Site.objects.get(name=sitename)
224             except:
225                 # we didn't find it in the data model
226                 continue
227
228             allocated_slivers = 0
229             if model_site and slice:
230                 for sliver in slice.slivers.all():
231                     if sliver.node.site == model_site:
232                         allocated_slivers = allocated_slivers + 1
233
234             row["lat"] = float(model_site.location.latitude)
235             row["long"] = float(model_site.location.longitude)
236             row["url"] = model_site.site_url
237             row["numNodes"] = model_site.nodes.count()
238             row["allocated_slivers"] = allocated_slivers
239
240             max_cpu = row.get("max_avg_cpu", row.get("max_cpu",0))
241             cpu=float(max_cpu)/100.0
242             row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD))
243
244     def compose_latest_query(self, fieldNames=None, groupByFields=["%hostname", "event"]):
245         """ Compose a query that returns the 'most recent' row for each (hostname, event)
246             pair.
247         """
248
249         if not fieldNames:
250             fieldNames = ["%hostname", "%bytes_sent", "time", "event", "%site", "%elapsed", "%slice", "%cpu"]
251
252         fields = ["table1.%s AS %s" % (x,x) for x in fieldNames]
253         fields = ", ".join(fields)
254
255         tableDesc = "%s.%s" % (self.projectName, self.tableName)
256
257         groupByOn = ["table1.time = latest.maxtime"]
258         for field in groupByFields:
259             groupByOn.append("table1.%s = latest.%s" % (field, field))
260
261         groupByOn = " AND ".join(groupByOn)
262         groupByFields = ", ".join(groupByFields)
263
264         base_query = "SELECT %s FROM [%s@-3600000--1] AS table1 JOIN (SELECT %s, max(time) as maxtime from [%s@-3600000--1] GROUP BY %s) AS latest ON %s" % \
265                       (fields, tableDesc, groupByFields, tableDesc, groupByFields, groupByOn)
266
267         return base_query
268
269     def get_cached_query_results(self, q, wait=True):
270         global glo_cached_queries
271
272         if q in glo_cached_queries:
273             if (time.time() - glo_cached_queries[q]["time"]) <= 60:
274                 print "using cached query"
275                 return glo_cached_queries[q]["rows"]
276
277         if not wait:
278             return None
279
280         print "refreshing cached query"
281         result = self.run_query(q)
282         glo_cached_queries[q] = {"time": time.time(), "rows": result}
283
284         return result
285
286     def process_request(self, req):
287         print req.GET
288
289         tqx = req.GET.get("tqx", None)
290
291         slice = req.GET.get("slice", None)
292         site = req.GET.get("site", None)
293         node = req.GET.get("node", None)
294         service = req.GET.get("service", None)
295         event = req.GET.get("event", "libvirt_heartbeat")
296
297         format = req.GET.get("format", "json_dicts")
298
299         timeBucket = int(req.GET.get("timeBucket", 60))
300         avg = self.get_list_from_req(req, "avg")
301         sum = self.get_list_from_req(req, "sum")
302         count = self.get_list_from_req(req, "count")
303         computed = self.get_list_from_req(req, "computed")
304         groupBy = self.get_list_from_req(req, "groupBy", ["Time"])
305         orderBy = self.get_list_from_req(req, "orderBy", ["Time"])
306
307         maxRows = req.GET.get("maxRows", None)
308         mergeDataModelSites = req.GET.get("mergeDataModelSites", None)
309
310         maxAge = int(req.GET.get("maxAge", 60*60))
311
312         cached = req.GET.get("cached", None)
313
314         q = self.compose_query(slice, site, node, service, event, timeBucket, avg, sum, count, computed, [], groupBy, orderBy, maxAge=maxAge)
315
316         print q
317
318         dataSourceUrl = "http://" + req.META["SERVER_NAME"] + ":" + req.META["SERVER_PORT"] + req.META["PATH_INFO"] + "?" + req.META["QUERY_STRING"].replace("format=","origFormat=").replace("%","%25") + "&format=charts";
319
320         if (format=="dataSourceUrl"):
321             result = {"dataSourceUrl": dataSourceUrl}
322             return ("application/javascript", result)
323
324         elif (format=="raw"):
325             result = self.run_query_raw(q)
326             result["dataSourceUrl"] = dataSourceUrl
327
328             result = json.dumps(result);
329
330             return ("application/javascript", result)
331
332         elif (format=="nodata"):
333             result = {"dataSourceUrl": dataSourceUrl, "query": q}
334             result = json.dumps(result);
335             return {"application/javascript", result}
336
337         elif (format=="charts"):
338             bq_result = self.run_query_raw(q)
339
340             # cloudscrutiny code is probably better!
341             table = {}
342             table["cols"] = self.schema_to_cols(bq_result["schema"])
343             rows = []
344             if "rows" in bq_result:
345                 for row in bq_result["rows"]:
346                     rowcols = []
347                     for (colnum,col) in enumerate(row["f"]):
348                         if (colnum==0):
349                             dt = datetime.datetime.fromtimestamp(float(col["v"]))
350                             rowcols.append({"v": 'new Date("%s")' % dt.isoformat()})
351                         else:
352                             try:
353                                 rowcols.append({"v": float(col["v"])})
354                             except:
355                                 rowcols.append({"v": col["v"]})
356                     rows.append({"c": rowcols})
357             table["rows"] = rows
358
359             if tqx:
360                 reqId = tqx.strip("reqId:")
361             else:
362                 reqId = "0"
363
364             result = {"status": "okColumnChart", "reqId": reqId, "table": table, "version": "0.6"}
365
366             result = "google.visualization.Query.setResponse(" + json.dumps(result) + ");"
367
368             def unquote_it(x): return x.group()[1:-1].replace('\\"', '"')
369
370             p = re.compile(r'"new Date\(\\"[^"]*\\"\)"')
371             result=p.sub(unquote_it, result)
372
373             return ("application/javascript", result)
374
375         else:
376             if cached:
377                 results = self.get_cached_query_results(self.compose_latest_query())
378
379                 filter={}
380                 if slice:
381                     filter["slice"] = slice
382                 if site:
383                     filter["site"] = site
384                 if node:
385                     filter["hostname"] = node
386                 if event:
387                     filter["event"] = event
388
389                 result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=["doesnotexist"])
390             else:
391                 result = self.run_query(q)
392
393             if maxRows:
394                 result = result[-int(maxRows):]
395
396             if mergeDataModelSites:
397                 self.merge_datamodel_sites(result)
398
399             return self.format_result(format, result, q, dataSourceUrl)
400
401 def DoPlanetStackAnalytics(request):
402     bq = PlanetStackAnalytics()
403     result = bq.process_request(request)
404
405     return result
406
407 def main():
408     bq = PlanetStackAnalytics(tableName="demoevents")
409
410     q = bq.compose_latest_query(groupByFields=["%hostname", "event", "%slice"])
411     results = bq.run_query(q)
412
413     #results = bq.postprocess_results(results,
414     #                                 filter={"slice": "HyperCache"},
415     #                                 groupBy=["site"],
416     #                                 computed=["bytes_sent/elapsed"],
417     #                                 sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"],
418     #                                 maxDeltaTime=60)
419
420     results = bq.postprocess_results(results, filter={"slice": "HyperCache"}, maxi=["cpu"], count=["hostname"], computed=["bytes_sent/elapsed"], groupBy=["Time", "site"], maxDeltaTime=80)
421
422     bq.dump_table(results)
423
424     sys.exit(0)
425
426     q=bq.compose_query(sum=["%bytes_sent"], avg=["%cpu"], latest=True, groupBy=["Time", "%site"])
427     print q
428     bq.dump_table(bq.run_query(q))
429
430     q=bq.compose_query(avg=["%cpu","%bandwidth"], count=["%hostname"], slice="HyperCache")
431     print q
432     bq.dump_table(bq.run_query(q))
433
434     q=bq.compose_query(computed=["%bytes_sent/%elapsed"])
435     print
436     print q
437     bq.dump_table(bq.run_query(q))
438
439     q=bq.compose_query(timeBucket=60*60, avg=["%cpu"], count=["%hostname"], computed=["%bytes_sent/%elapsed"])
440     print
441     print q
442     bq.dump_table(bq.run_query(q))
443
444 if __name__ == "__main__":
445     main()
446
447
448
449
450