Merge branch 'master' of ssh://git.planet-lab.org/git/plstackapi
[plstackapi.git] / planetstack / hpc_wizard / planetstack_analytics.py
1 from bigquery_analytics import BigQueryAnalytics
2 import datetime
3 import re
4 import os
5 import sys
6 import time
7 import json
8 import traceback
9 import urllib2
10
11 if os.path.exists("/home/smbaker/projects/vicci/plstackapi/planetstack"):
12     sys.path.append("/home/smbaker/projects/vicci/plstackapi/planetstack")
13 else:
14     sys.path.append("/opt/planetstack")
15
16 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
17 from django.conf import settings
18 from django import db
19 from django.db import connection
20 from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service
21
22 BLUE_LOAD=5000000
23 RED_LOAD=15000000
24
25 glo_cached_queries = {}
26
27 class PlanetStackAnalytics(BigQueryAnalytics):
28     def __init__(self, tableName=None):
29         if not tableName:
30             tableName = settings.BIGQUERY_TABLE
31
32         BigQueryAnalytics.__init__(self, tableName)
33
34     def service_to_sliceNames(self, serviceName):
35         service=Service.objects.get(name=serviceName)
36         try:
37             slices = service.slices.all()
38         except:
39             # BUG in data model -- Slice.service has related name 'service' and
40             #                      it should be 'slices'
41             slices = service.service.all()
42
43         return [slice.name for slice in slices]
44
45     def compose_query(self, slice=None, site=None, node=None, service=None, event="libvirt_heartbeat", timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName=None, latest=False, maxAge=60*60):
46         if tableName is None:
47             tableName = self.tableName
48
49         maxAge = maxAge * 1000
50         tablePart = "[%s.%s@-%d--1]" % ("vicci", tableName, maxAge)
51
52         fields = []
53         fieldNames = []
54         srcFieldNames = ["time"]
55
56         fields.append("SEC_TO_TIMESTAMP(INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s) as Time" % (str(timeBucket),str(timeBucket)))
57         #fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s as Time" % (str(timeBucket),str(timeBucket)))
58
59         for fieldName in avg:
60             fields.append("AVG(%s) as avg_%s" % (fieldName, fieldName.replace("%","")))
61             fieldNames.append("avg_%s" % fieldName.replace("%",""))
62             srcFieldNames.append(fieldName)
63
64         for fieldName in sum:
65             fields.append("SUM(%s) as sum_%s" % (fieldName, fieldName.replace("%","")))
66             fieldNames.append("sum_%s" % fieldName.replace("%",""))
67             srcFieldNames.append(fieldName)
68
69         for fieldName in count:
70             fields.append("COUNT(distinct %s) as count_%s" % (fieldName, fieldName.replace("%","")))
71             fieldNames.append("count_%s" % fieldName.replace("%",""))
72             srcFieldNames.append(fieldName)
73
74         for fieldName in val:
75             fields.append(fieldName)
76             fieldNames.append(fieldName)
77             srcFieldNames.append(fieldName)
78
79         for fieldName in computed:
80             operator = "/"
81             parts = fieldName.split("/")
82             computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
83             if len(parts)==1:
84                 operator = "*"
85                 parts = computed.split("*")
86                 computedFieldName = "computed_" + parts[0].replace("%","")+"_mult_"+parts[1].replace("%","")
87             fields.append("SUM(%s)%sSUM(%s) as %s" % (parts[0], operator, parts[1], computedFieldName))
88             fieldNames.append(computedFieldName)
89             srcFieldNames.append(parts[0])
90             srcFieldNames.append(parts[1])
91
92         for fieldName in groupBy:
93             if (fieldName not in ["Time"]):
94                 fields.append(fieldName)
95                 fieldNames.append(fieldName)
96                 srcFieldNames.append(fieldName)
97
98         fields = ", ".join(fields)
99
100         where = []
101
102         if slice:
103             where.append("%%slice='%s'" % slice)
104         if site:
105             where.append("%%site='%s'" % site)
106         if node:
107             where.append("%%hostname='%s'" % node)
108         if event:
109             where.append("event='%s'" % event)
110         if service:
111             sliceNames = self.service_to_sliceNames(service)
112             if sliceNames:
113                 where.append("(" + " OR ".join(["%%slice='%s'" % sliceName for sliceName in sliceNames]) +")")
114
115         if where:
116             where = " WHERE " + " AND ".join(where)
117         else:
118             where =""
119
120         if groupBy:
121             groupBySub = " GROUP BY " + ",".join(groupBy + ["%hostname"])
122             groupBy = " GROUP BY " + ",".join(groupBy)
123         else:
124             groupBySub = " GROUP BY %hostname"
125             groupBy = ""
126
127         if orderBy:
128             orderBy = " ORDER BY " + ",".join(orderBy)
129         else:
130             orderBy = ""
131
132         if latest:
133             latestFields = ["table1.%s as %s" % (x,x) for x in srcFieldNames]
134             latestFields = ", ".join(latestFields)
135             tablePart = """(SELECT %s FROM %s AS table1
136                             JOIN
137                                 (SELECT %%hostname, event, max(time) as maxtime from %s GROUP BY %%hostname, event) AS latest
138                             ON
139                                 table1.%%hostname = latest.%%hostname AND table1.event = latest.event AND table1.time = latest.maxtime)""" % (latestFields, tablePart, tablePart)
140
141         if computed:
142             subQuery = "SELECT %%hostname, %s FROM %s" % (fields, tablePart)
143             if where:
144                 subQuery = subQuery + where
145             subQuery = subQuery + groupBySub
146
147             sumFields = []
148             for fieldName in fieldNames:
149                 if fieldName.startswith("avg"):
150                     sumFields.append("AVG(%s) as avg_%s"%(fieldName,fieldName))
151                     sumFields.append("MAX(%s) as max_%s"%(fieldName,fieldName))
152                 elif (fieldName.startswith("count")) or (fieldName.startswith("sum")) or (fieldName.startswith("computed")):
153                     sumFields.append("SUM(%s) as sum_%s"%(fieldName,fieldName))
154                 else:
155                     sumFields.append(fieldName)
156
157             sumFields = ",".join(sumFields)
158
159             query = "SELECT %s, %s FROM (%s)" % ("Time", sumFields, subQuery)
160             if groupBy:
161                 query = query + groupBy
162             if orderBy:
163                 query = query + orderBy
164         else:
165             query = "SELECT %s FROM %s" % (fields, tablePart)
166             if where:
167                 query = query + " " + where
168             if groupBy:
169                 query = query + groupBy
170             if orderBy:
171                 query = query + orderBy
172
173         return query
174
175     def get_list_from_req(self, req, name, default=[]):
176         value = req.GET.get(name, None)
177         if not value:
178             return default
179         value=value.replace("@","%")
180         return value.split(",")
181
182     def format_result(self, format, result, query, dataSourceUrl):
183         if (format == "json_dicts"):
184             result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl}
185             return ("application/javascript", json.dumps(result))
186
187         elif (format == "json_arrays"):
188             new_result = []
189             for row in result:
190                 new_row = []
191                 for key in sorted(row.keys()):
192                     new_row.append(row[key])
193                 new_result.append(new_row)
194                 new_result = {"query": query, "rows": new_result}
195             return ("application/javascript", json.dumps(new_result))
196
197         elif (format == "html_table"):
198             new_rows = []
199             for row in result:
200                 new_row = []
201                 for key in sorted(row.keys()):
202                     new_row.append("<TD>%s</TD>" % str(row[key]))
203                 new_rows.append("<TR>%s</TR>" % "".join(new_row))
204
205             new_result = "<TABLE>%s</TABLE>" % "\n".join(new_rows)
206
207             return ("text/html", new_result)
208
209     def merge_datamodel_sites(self, rows, slice=None):
210         """ For a query that included "site" in its groupby, merge in the
211             opencloud site information.
212         """
213
214         if slice:
215             try:
216                 slice = Slice.objects.get(name=slice)
217             except:
218                 slice = None
219
220         for row in rows:
221             sitename = row["site"]
222             try:
223                 model_site = Site.objects.get(name=sitename)
224             except:
225                 # we didn't find it in the data model
226                 continue
227
228             allocated_slivers = 0
229             if model_site and slice:
230                 for sliver in slice.slivers.all():
231                     if sliver.node.site == model_site:
232                         allocated_slivers = allocated_slivers + 1
233
234             row["lat"] = float(model_site.location.latitude)
235             row["long"] = float(model_site.location.longitude)
236             row["url"] = model_site.site_url
237             row["numNodes"] = model_site.nodes.count()
238             row["allocated_slivers"] = allocated_slivers
239
240             max_cpu = row.get("max_avg_cpu", row.get("max_cpu",0))
241             cpu=float(max_cpu)/100.0
242             row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD))
243
244     def compose_latest_query(self, fieldNames=None, groupByFields=["%hostname", "event"]):
245         """ Compose a query that returns the 'most recent' row for each (hostname, event)
246             pair.
247         """
248
249         if not fieldNames:
250             fieldNames = ["%hostname", "%bytes_sent", "%bytes_hit", "%healthy", "time", "event", "%site", "%elapsed", "%slice", "%cpu"]
251
252         fields = ["table1.%s AS %s" % (x,x) for x in fieldNames]
253         fields = ", ".join(fields)
254
255         tableDesc = "%s.%s" % (self.projectName, self.tableName)
256
257         groupByOn = ["table1.time = latest.maxtime"]
258         for field in groupByFields:
259             groupByOn.append("table1.%s = latest.%s" % (field, field))
260
261         groupByOn = " AND ".join(groupByOn)
262         groupByFields = ", ".join(groupByFields)
263
264         base_query = "SELECT %s FROM [%s@-3600000--1] AS table1 JOIN (SELECT %s, max(time) as maxtime from [%s@-3600000--1] GROUP BY %s) AS latest ON %s" % \
265                       (fields, tableDesc, groupByFields, tableDesc, groupByFields, groupByOn)
266
267         return base_query
268
269     def get_cached_query_results(self, q, wait=True):
270         global glo_cached_queries
271
272         if q in glo_cached_queries:
273             if (time.time() - glo_cached_queries[q]["time"]) <= 60:
274                 print "using cached query"
275                 return glo_cached_queries[q]["rows"]
276
277         if not wait:
278             return None
279
280         print "refreshing cached query"
281         result = self.run_query(q)
282         glo_cached_queries[q] = {"time": time.time(), "rows": result}
283
284         return result
285
286     def process_request(self, req):
287         print req.GET
288
289         tqx = req.GET.get("tqx", None)
290
291         slice = req.GET.get("slice", None)
292         site = req.GET.get("site", None)
293         node = req.GET.get("node", None)
294         service = req.GET.get("service", None)
295         event = req.GET.get("event", "libvirt_heartbeat")
296
297         format = req.GET.get("format", "json_dicts")
298
299         timeBucket = int(req.GET.get("timeBucket", 60))
300         avg = self.get_list_from_req(req, "avg")
301         sum = self.get_list_from_req(req, "sum")
302         count = self.get_list_from_req(req, "count")
303         computed = self.get_list_from_req(req, "computed")
304         groupBy = self.get_list_from_req(req, "groupBy", ["Time"])
305         orderBy = self.get_list_from_req(req, "orderBy", ["Time"])
306
307         maxRows = req.GET.get("maxRows", None)
308         mergeDataModelSites = req.GET.get("mergeDataModelSites", None)
309
310         maxAge = int(req.GET.get("maxAge", 60*60))
311
312         cached = req.GET.get("cached", None)
313         cachedGroupBy = self.get_list_from_req(req, "cachedGroupBy", ["doesnotexist"])
314
315         q = self.compose_query(slice, site, node, service, event, timeBucket, avg, sum, count, computed, [], groupBy, orderBy, maxAge=maxAge)
316
317         print q
318
319         dataSourceUrl = "http://" + req.META["SERVER_NAME"] + ":" + req.META["SERVER_PORT"] + req.META["PATH_INFO"] + "?" + req.META["QUERY_STRING"].replace("format=","origFormat=").replace("%","%25") + "&format=charts";
320
321         if (format=="dataSourceUrl"):
322             result = {"dataSourceUrl": dataSourceUrl}
323             return ("application/javascript", result)
324
325         elif (format=="raw"):
326             result = self.run_query_raw(q)
327             result["dataSourceUrl"] = dataSourceUrl
328
329             result = json.dumps(result);
330
331             return ("application/javascript", result)
332
333         elif (format=="nodata"):
334             result = {"dataSourceUrl": dataSourceUrl, "query": q}
335             result = json.dumps(result);
336             return {"application/javascript", result}
337
338         elif (format=="charts"):
339             bq_result = self.run_query_raw(q)
340
341             # cloudscrutiny code is probably better!
342             table = {}
343             table["cols"] = self.schema_to_cols(bq_result["schema"])
344             rows = []
345             if "rows" in bq_result:
346                 for row in bq_result["rows"]:
347                     rowcols = []
348                     for (colnum,col) in enumerate(row["f"]):
349                         if (colnum==0):
350                             dt = datetime.datetime.fromtimestamp(float(col["v"]))
351                             rowcols.append({"v": 'new Date("%s")' % dt.isoformat()})
352                         else:
353                             try:
354                                 rowcols.append({"v": float(col["v"])})
355                             except:
356                                 rowcols.append({"v": col["v"]})
357                     rows.append({"c": rowcols})
358             table["rows"] = rows
359
360             if tqx:
361                 reqId = tqx.strip("reqId:")
362             else:
363                 reqId = "0"
364
365             result = {"status": "okColumnChart", "reqId": reqId, "table": table, "version": "0.6"}
366
367             result = "google.visualization.Query.setResponse(" + json.dumps(result) + ");"
368
369             def unquote_it(x): return x.group()[1:-1].replace('\\"', '"')
370
371             p = re.compile(r'"new Date\(\\"[^"]*\\"\)"')
372             result=p.sub(unquote_it, result)
373
374             return ("application/javascript", result)
375
376         else:
377             if cached:
378                 results = self.get_cached_query_results(self.compose_latest_query())
379
380                 filter={}
381                 if slice:
382                     filter["slice"] = slice
383                 if site:
384                     filter["site"] = site
385                 if node:
386                     filter["hostname"] = node
387                 if event:
388                     filter["event"] = event
389
390                 result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=cachedGroupBy)
391             else:
392                 result = self.run_query(q)
393
394             if maxRows:
395                 result = result[-int(maxRows):]
396
397             if mergeDataModelSites:
398                 self.merge_datamodel_sites(result)
399
400             return self.format_result(format, result, q, dataSourceUrl)
401
402 def DoPlanetStackAnalytics(request):
403     bq = PlanetStackAnalytics()
404     result = bq.process_request(request)
405
406     return result
407
408 def main():
409     bq = PlanetStackAnalytics(tableName="demoevents")
410
411     q = bq.compose_latest_query(groupByFields=["%hostname", "event", "%slice"])
412     results = bq.run_query(q)
413
414     #results = bq.postprocess_results(results,
415     #                                 filter={"slice": "HyperCache"},
416     #                                 groupBy=["site"],
417     #                                 computed=["bytes_sent/elapsed"],
418     #                                 sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"],
419     #                                 maxDeltaTime=60)
420
421     results = bq.postprocess_results(results, filter={"slice": "HyperCache"}, maxi=["cpu"], count=["hostname"], computed=["bytes_sent/elapsed"], groupBy=["Time", "site"], maxDeltaTime=80)
422
423     bq.dump_table(results)
424
425     sys.exit(0)
426
427     q=bq.compose_query(sum=["%bytes_sent"], avg=["%cpu"], latest=True, groupBy=["Time", "%site"])
428     print q
429     bq.dump_table(bq.run_query(q))
430
431     q=bq.compose_query(avg=["%cpu","%bandwidth"], count=["%hostname"], slice="HyperCache")
432     print q
433     bq.dump_table(bq.run_query(q))
434
435     q=bq.compose_query(computed=["%bytes_sent/%elapsed"])
436     print
437     print q
438     bq.dump_table(bq.run_query(q))
439
440     q=bq.compose_query(timeBucket=60*60, avg=["%cpu"], count=["%hostname"], computed=["%bytes_sent/%elapsed"])
441     print
442     print q
443     bq.dump_table(bq.run_query(q))
444
445 if __name__ == "__main__":
446     main()
447
448
449
450
451