3ba79a6b6bf6c918a75893cfff1bb919f3cfa3d1
[plstackapi.git] / planetstack / hpc_wizard / planetstack_analytics.py
1 from bigquery_analytics import BigQueryAnalytics
2 import datetime
3 import re
4 import os
5 import sys
6 import time
7 import json
8 import traceback
9 import urllib2
10
11 if os.path.exists("/home/smbaker/projects/vicci/plstackapi/planetstack"):
12     sys.path.append("/home/smbaker/projects/vicci/plstackapi/planetstack")
13 else:
14     sys.path.append("/opt/planetstack")
15
16 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
17 from django.conf import settings
18 from django import db
19 from django.db import connection
20 from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service
21
22 BLUE_LOAD=5000000
23 RED_LOAD=15000000
24
25 glo_cached_queries = {}
26
27 class PlanetStackAnalytics(BigQueryAnalytics):
28     def __init__(self, tableName=None):
29         if not tableName:
30             tableName = settings.BIGQUERY_TABLE
31
32         BigQueryAnalytics.__init__(self, tableName)
33
34     def service_to_sliceNames(self, serviceName):
35         service=Service.objects.get(name=serviceName)
36         try:
37             slices = service.slices.all()
38         except:
39             # BUG in data model -- Slice.service has related name 'service' and
40             #                      it should be 'slices'
41             slices = service.service.all()
42
43         return [slice.name for slice in slices]
44
45     def compose_query(self, filter={}, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName=None, latest=False, maxAge=60*60):
46         if tableName is None:
47             tableName = self.tableName
48
49         maxAge = maxAge * 1000
50         tablePart = "[%s.%s@-%d--1]" % ("vicci", tableName, maxAge)
51
52         fields = []
53         fieldNames = []
54         srcFieldNames = ["time"]
55
56         fields.append("SEC_TO_TIMESTAMP(INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s) as Time" % (str(timeBucket),str(timeBucket)))
57         #fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s as Time" % (str(timeBucket),str(timeBucket)))
58
59         for fieldName in avg:
60             fields.append("AVG(%s) as avg_%s" % (fieldName, fieldName.replace("%","")))
61             fieldNames.append("avg_%s" % fieldName.replace("%",""))
62             srcFieldNames.append(fieldName)
63
64         for fieldName in sum:
65             fields.append("SUM(%s) as sum_%s" % (fieldName, fieldName.replace("%","")))
66             fieldNames.append("sum_%s" % fieldName.replace("%",""))
67             srcFieldNames.append(fieldName)
68
69         for fieldName in count:
70             fields.append("COUNT(distinct %s) as count_%s" % (fieldName, fieldName.replace("%","")))
71             fieldNames.append("count_%s" % fieldName.replace("%",""))
72             srcFieldNames.append(fieldName)
73
74         for fieldName in val:
75             fields.append(fieldName)
76             fieldNames.append(fieldName)
77             srcFieldNames.append(fieldName)
78
79         for fieldName in computed:
80             operator = "/"
81             parts = fieldName.split("/")
82             computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
83             if len(parts)==1:
84                 operator = "*"
85                 parts = computed.split("*")
86                 computedFieldName = "computed_" + parts[0].replace("%","")+"_mult_"+parts[1].replace("%","")
87             fields.append("SUM(%s)%sSUM(%s) as %s" % (parts[0], operator, parts[1], computedFieldName))
88             fieldNames.append(computedFieldName)
89             srcFieldNames.append(parts[0])
90             srcFieldNames.append(parts[1])
91
92         for fieldName in groupBy:
93             if (fieldName not in ["Time"]):
94                 fields.append(fieldName)
95                 fieldNames.append(fieldName)
96                 srcFieldNames.append(fieldName)
97
98         fields = ", ".join(fields)
99
100         where = []
101
102         if filter.get("slice",None):
103             where.append("%%slice='%s'" % filter["slice"])
104         if filter.get("site",None):
105             where.append("%%site='%s'" % filter["site"])
106         if filter.get("node",None):
107             where.append("%%hostname='%s'" % filter["node"])
108         if filter.get("event",None):
109             where.append("event='%s'" % filter["event"])
110         if filter.get("service",None):
111             sliceNames = self.service_to_sliceNames(filter["service"])
112             if sliceNames:
113                 where.append("(" + " OR ".join(["%%slice='%s'" % sliceName for sliceName in sliceNames]) +")")
114
115         if where:
116             where = " WHERE " + " AND ".join(where)
117         else:
118             where =""
119
120         if groupBy:
121             groupBySub = " GROUP BY " + ",".join(groupBy + ["%hostname"])
122             groupBy = " GROUP BY " + ",".join(groupBy)
123         else:
124             groupBySub = " GROUP BY %hostname"
125             groupBy = ""
126
127         if orderBy:
128             orderBy = " ORDER BY " + ",".join(orderBy)
129         else:
130             orderBy = ""
131
132         if latest:
133             latestFields = ["table1.%s as %s" % (x,x) for x in srcFieldNames]
134             latestFields = ", ".join(latestFields)
135             tablePart = """(SELECT %s FROM %s AS table1
136                             JOIN
137                                 (SELECT %%hostname, event, max(time) as maxtime from %s GROUP BY %%hostname, event) AS latest
138                             ON
139                                 table1.%%hostname = latest.%%hostname AND table1.event = latest.event AND table1.time = latest.maxtime)""" % (latestFields, tablePart, tablePart)
140
141         if computed:
142             subQuery = "SELECT %%hostname, %s FROM %s" % (fields, tablePart)
143             if where:
144                 subQuery = subQuery + where
145             subQuery = subQuery + groupBySub
146
147             sumFields = []
148             for fieldName in fieldNames:
149                 if fieldName.startswith("avg"):
150                     sumFields.append("AVG(%s) as avg_%s"%(fieldName,fieldName))
151                     sumFields.append("MAX(%s) as max_%s"%(fieldName,fieldName))
152                 elif (fieldName.startswith("count")) or (fieldName.startswith("sum")) or (fieldName.startswith("computed")):
153                     sumFields.append("SUM(%s) as sum_%s"%(fieldName,fieldName))
154                 else:
155                     sumFields.append(fieldName)
156
157             sumFields = ",".join(sumFields)
158
159             query = "SELECT %s, %s FROM (%s)" % ("Time", sumFields, subQuery)
160             if groupBy:
161                 query = query + groupBy
162             if orderBy:
163                 query = query + orderBy
164         else:
165             query = "SELECT %s FROM %s" % (fields, tablePart)
166             if where:
167                 query = query + " " + where
168             if groupBy:
169                 query = query + groupBy
170             if orderBy:
171                 query = query + orderBy
172
173         return query
174
175     def get_list_from_req(self, req, name, default=[]):
176         value = req.GET.get(name, None)
177         if not value:
178             return default
179         value=value.replace("@","%")
180         return value.split(",")
181
182     def format_result(self, format, result, query, dataSourceUrl):
183         if (format == "json_dicts"):
184             result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl}
185             return ("application/javascript", json.dumps(result))
186
187         elif (format == "json_arrays"):
188             new_result = []
189             for row in result:
190                 new_row = []
191                 for key in sorted(row.keys()):
192                     new_row.append(row[key])
193                 new_result.append(new_row)
194                 new_result = {"query": query, "rows": new_result}
195             return ("application/javascript", json.dumps(new_result))
196
197         elif (format == "html_table"):
198             new_rows = []
199             for row in result:
200                 new_row = []
201                 for key in sorted(row.keys()):
202                     new_row.append("<TD>%s</TD>" % str(row[key]))
203                 new_rows.append("<TR>%s</TR>" % "".join(new_row))
204
205             new_result = "<TABLE>%s</TABLE>" % "\n".join(new_rows)
206
207             return ("text/html", new_result)
208
209     def merge_datamodel_sites(self, rows, slice=None):
210         """ For a query that included "site" in its groupby, merge in the
211             opencloud site information.
212         """
213
214         if slice:
215             try:
216                 slice = Slice.objects.get(name=slice)
217             except:
218                 slice = None
219
220         for row in rows:
221             sitename = row["site"]
222             try:
223                 model_site = Site.objects.get(name=sitename)
224             except:
225                 # we didn't find it in the data model
226                 continue
227
228             allocated_slivers = 0
229             if model_site and slice:
230                 for sliver in slice.slivers.all():
231                     if sliver.node.site == model_site:
232                         allocated_slivers = allocated_slivers + 1
233
234             row["lat"] = float(model_site.location.latitude)
235             row["long"] = float(model_site.location.longitude)
236             row["url"] = model_site.site_url
237             row["numNodes"] = model_site.nodes.count()
238             row["allocated_slivers"] = allocated_slivers
239
240             max_cpu = row.get("max_avg_cpu", row.get("max_cpu",0))
241             cpu=float(max_cpu)/100.0
242             row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD))
243
244     def compose_cached_query(self, querySpec='default'):
245         """ Compose a query that returns the 'most recent' row for each (hostname, event)
246             pair.
247
248             Note that groupByFields cannot contain any values that are 'Null' or those
249             rows will be excluded. For example, if groupByFields includes cp, then
250             there will be no libvirt_event rows, since libvirt_event does not have
251             cp.
252
253             This means we can't really have 'one query to rule them'. Settle on
254             having a couple of different queries, and have the caller specify
255             which one he wants.
256         """
257
258         fieldNames = ["%hostname", "%bytes_sent", "%bytes_hit", "%healthy", "time", "event", "%site", "%elapsed", "%cpu"]
259
260         if querySpec=="default":
261             groupByFields = ["%hostname", "event"]
262         elif (querySpec=="hpc"):
263             fieldNames.append("%cp")
264             groupByFields = ["%hostname", "event", "%cp"]
265         else:
266             raise ValueError("Unknown queryspec %s" % querySpec)
267
268         fields = ["table1.%s AS %s" % (x,x) for x in fieldNames]
269         fields = ", ".join(fields)
270
271         tableDesc = "%s.%s" % (self.projectName, self.tableName)
272
273         groupByOn = ["table1.time = latest.maxtime"]
274         for field in groupByFields:
275             groupByOn.append("table1.%s = latest.%s" % (field, field))
276
277         groupByOn = " AND ".join(groupByOn)
278         groupByFields = ", ".join(groupByFields)
279
280         base_query = "SELECT %s FROM [%s@-3600000--1] AS table1 JOIN (SELECT %s, max(time) as maxtime from [%s@-3600000--1] GROUP BY %s) AS latest ON %s" % \
281                       (fields, tableDesc, groupByFields, tableDesc, groupByFields, groupByOn)
282
283         return base_query
284
285     def get_cached_query_results(self, q, wait=True):
286         global glo_cached_queries
287
288         if q in glo_cached_queries:
289             if (time.time() - glo_cached_queries[q]["time"]) <= 60:
290                 print "using cached query"
291                 return glo_cached_queries[q]["rows"]
292
293         if not wait:
294             return None
295
296         print "refreshing cached query"
297         result = self.run_query(q)
298         glo_cached_queries[q] = {"time": time.time(), "rows": result}
299
300         return result
301
302     def process_request(self, req):
303         print req.GET
304
305         tqx = req.GET.get("tqx", None)
306
307         slice = req.GET.get("slice", None)
308         site = req.GET.get("site", None)
309         node = req.GET.get("node", None)
310         service = req.GET.get("service", None)
311         event = req.GET.get("event", "libvirt_heartbeat")
312         cp = req.GET.get("cp", None)
313
314         format = req.GET.get("format", "json_dicts")
315
316         timeBucket = int(req.GET.get("timeBucket", 60))
317         avg = self.get_list_from_req(req, "avg")
318         sum = self.get_list_from_req(req, "sum")
319         count = self.get_list_from_req(req, "count")
320         computed = self.get_list_from_req(req, "computed")
321         groupBy = self.get_list_from_req(req, "groupBy", ["Time"])
322         orderBy = self.get_list_from_req(req, "orderBy", ["Time"])
323
324         maxRows = req.GET.get("maxRows", None)
325         mergeDataModelSites = req.GET.get("mergeDataModelSites", None)
326
327         maxAge = int(req.GET.get("maxAge", 60*60))
328
329         cached = req.GET.get("cached", None)
330         cachedGroupBy = self.get_list_from_req(req, "cachedGroupBy", ["doesnotexist"])
331
332         filter={}
333         if slice:
334             filter["slice"] = slice
335         if site:
336             filter["site"] = site
337         if node:
338             filter["hostname"] = node
339         if event:
340             filter["event"] = event
341         if cp:
342             filter["cp"] = cp
343
344         q = self.compose_query(filter, timeBucket, avg, sum, count, computed, [], groupBy, orderBy, maxAge=maxAge)
345
346         print q
347
348         dataSourceUrl = "http://" + req.META["SERVER_NAME"] + ":" + req.META["SERVER_PORT"] + req.META["PATH_INFO"] + "?" + req.META["QUERY_STRING"].replace("format=","origFormat=").replace("%","%25") + "&format=charts";
349
350         if (format=="dataSourceUrl"):
351             result = {"dataSourceUrl": dataSourceUrl}
352             return ("application/javascript", result)
353
354         elif (format=="raw"):
355             result = self.run_query_raw(q)
356             result["dataSourceUrl"] = dataSourceUrl
357
358             result = json.dumps(result);
359
360             return ("application/javascript", result)
361
362         elif (format=="nodata"):
363             result = {"dataSourceUrl": dataSourceUrl, "query": q}
364             result = json.dumps(result);
365             return {"application/javascript", result}
366
367         elif (format=="charts"):
368             bq_result = self.run_query_raw(q)
369
370             # cloudscrutiny code is probably better!
371             table = {}
372             table["cols"] = self.schema_to_cols(bq_result["schema"])
373             rows = []
374             if "rows" in bq_result:
375                 for row in bq_result["rows"]:
376                     rowcols = []
377                     for (colnum,col) in enumerate(row["f"]):
378                         if (colnum==0):
379                             dt = datetime.datetime.fromtimestamp(float(col["v"]))
380                             rowcols.append({"v": 'new Date("%s")' % dt.isoformat()})
381                         else:
382                             try:
383                                 rowcols.append({"v": float(col["v"])})
384                             except:
385                                 rowcols.append({"v": col["v"]})
386                     rows.append({"c": rowcols})
387             table["rows"] = rows
388
389             if tqx:
390                 reqId = tqx.strip("reqId:")
391             else:
392                 reqId = "0"
393
394             result = {"status": "okColumnChart", "reqId": reqId, "table": table, "version": "0.6"}
395
396             result = "google.visualization.Query.setResponse(" + json.dumps(result) + ");"
397
398             def unquote_it(x): return x.group()[1:-1].replace('\\"', '"')
399
400             p = re.compile(r'"new Date\(\\"[^"]*\\"\)"')
401             result=p.sub(unquote_it, result)
402
403             return ("application/javascript", result)
404
405         else:
406             if cached:
407                 results = self.get_cached_query_results(self.compose_cached_query(cached))
408
409                 result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=cachedGroupBy)
410             else:
411                 result = self.run_query(q)
412
413             if maxRows:
414                 result = result[-int(maxRows):]
415
416             if mergeDataModelSites:
417                 self.merge_datamodel_sites(result)
418
419             return self.format_result(format, result, q, dataSourceUrl)
420
421 def DoPlanetStackAnalytics(request):
422     bq = PlanetStackAnalytics()
423     result = bq.process_request(request)
424
425     return result
426
427 def main():
428     bq = PlanetStackAnalytics(tableName="demoevents")
429
430     q = bq.compose_cached_query()
431     results = bq.run_query(q)
432
433     #results = bq.postprocess_results(results,
434     #                                 filter={"slice": "HyperCache"},
435     #                                 groupBy=["site"],
436     #                                 computed=["bytes_sent/elapsed"],
437     #                                 sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"],
438     #                                 maxDeltaTime=60)
439
440     #results = bq.postprocess_results(results, filter={"slice": "HyperCache"}, maxi=["cpu"], count=["hostname"], computed=["bytes_sent/elapsed"], groupBy=["Time", "site"], maxDeltaTime=80)
441
442     results = bq.postprocess_results(results,filter={"event": "libvirt_heartbeat"}, avg=["cpu"], count=["hostname"], groupBy=["doesnotexist"])
443
444     bq.dump_table(results)
445
446     sys.exit(0)
447
448     q=bq.compose_query(sum=["%bytes_sent"], avg=["%cpu"], latest=True, groupBy=["Time", "%site"])
449     print q
450     bq.dump_table(bq.run_query(q))
451
452     q=bq.compose_query(avg=["%cpu","%bandwidth"], count=["%hostname"], slice="HyperCache")
453     print q
454     bq.dump_table(bq.run_query(q))
455
456     q=bq.compose_query(computed=["%bytes_sent/%elapsed"])
457     print
458     print q
459     bq.dump_table(bq.run_query(q))
460
461     q=bq.compose_query(timeBucket=60*60, avg=["%cpu"], count=["%hostname"], computed=["%bytes_sent/%elapsed"])
462     print
463     print q
464     bq.dump_table(bq.run_query(q))
465
466 if __name__ == "__main__":
467     main()
468
469
470
471
472