user's can't set/unset is_admin, is_active and is_readonly values in Login Details...
[plstackapi.git] / planetstack / hpc_wizard / planetstack_analytics.py
1 from bigquery_analytics import BigQueryAnalytics, BIGQUERY_AVAILABLE
2 import datetime
3 import re
4 import os
5 import sys
6 import time
7 import json
8 import traceback
9 import urllib2
10
11 if os.path.exists("/home/smbaker/projects/vicci/plstackapi/planetstack"):
12     sys.path.append("/home/smbaker/projects/vicci/plstackapi/planetstack")
13 else:
14     sys.path.append("/opt/planetstack")
15
16 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
17 from django.conf import settings
18 from django import db
19 from django.db import connection
20 from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service
21
22 BLUE_LOAD=5000000
23 RED_LOAD=15000000
24
25 glo_cached_queries = {}
26
27 class PlanetStackAnalytics(BigQueryAnalytics):
28     def __init__(self, tableName=None):
29         if not tableName:
30             tableName = settings.BIGQUERY_TABLE
31
32         BigQueryAnalytics.__init__(self, tableName)
33
34     def service_to_sliceNames(self, serviceName):
35         service=Service.objects.get(name=serviceName)
36         try:
37             slices = service.slices.all()
38         except:
39             # BUG in data model -- Slice.service has related name 'service' and
40             #                      it should be 'slices'
41             slices = service.service.all()
42
43         return [slice.name for slice in slices]
44
45     def compose_query(self, filter={}, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName=None, latest=False, maxAge=60*60):
46         if tableName is None:
47             tableName = self.tableName
48
49         maxAge = maxAge * 1000
50         tablePart = "[%s.%s@-%d--1]" % ("vicci", tableName, maxAge)
51
52         fields = []
53         fieldNames = []
54         srcFieldNames = ["time"]
55
56         fields.append("SEC_TO_TIMESTAMP(INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s) as Time" % (str(timeBucket),str(timeBucket)))
57         #fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s as Time" % (str(timeBucket),str(timeBucket)))
58
59         for fieldName in avg:
60             fields.append("AVG(%s) as avg_%s" % (fieldName, fieldName.replace("%","")))
61             fieldNames.append("avg_%s" % fieldName.replace("%",""))
62             srcFieldNames.append(fieldName)
63
64         for fieldName in sum:
65             fields.append("SUM(%s) as sum_%s" % (fieldName, fieldName.replace("%","")))
66             fieldNames.append("sum_%s" % fieldName.replace("%",""))
67             srcFieldNames.append(fieldName)
68
69         for fieldName in count:
70             fields.append("COUNT(distinct %s) as count_%s" % (fieldName, fieldName.replace("%","")))
71             fieldNames.append("count_%s" % fieldName.replace("%",""))
72             srcFieldNames.append(fieldName)
73
74         for fieldName in val:
75             fields.append(fieldName)
76             fieldNames.append(fieldName)
77             srcFieldNames.append(fieldName)
78
79         for fieldName in computed:
80             operator = "/"
81             parts = fieldName.split("/")
82             computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
83             if len(parts)==1:
84                 operator = "*"
85                 parts = computed.split("*")
86                 computedFieldName = "computed_" + parts[0].replace("%","")+"_mult_"+parts[1].replace("%","")
87             fields.append("SUM(%s)%sSUM(%s) as %s" % (parts[0], operator, parts[1], computedFieldName))
88             fieldNames.append(computedFieldName)
89             srcFieldNames.append(parts[0])
90             srcFieldNames.append(parts[1])
91
92         for fieldName in groupBy:
93             if (fieldName not in ["Time"]):
94                 fields.append(fieldName)
95                 fieldNames.append(fieldName)
96                 srcFieldNames.append(fieldName)
97
98         fields = ", ".join(fields)
99
100         where = []
101
102         if filter.get("slice",None):
103             where.append("%%slice='%s'" % filter["slice"])
104         if filter.get("site",None):
105             where.append("%%site='%s'" % filter["site"])
106         if filter.get("node",None):
107             where.append("%%hostname='%s'" % filter["node"])
108         if filter.get("event",None):
109             where.append("event='%s'" % filter["event"])
110         if filter.get("service",None):
111             sliceNames = self.service_to_sliceNames(filter["service"])
112             if sliceNames:
113                 where.append("(" + " OR ".join(["%%slice='%s'" % sliceName for sliceName in sliceNames]) +")")
114
115         if where:
116             where = " WHERE " + " AND ".join(where)
117         else:
118             where =""
119
120         if groupBy:
121             groupBySub = " GROUP BY " + ",".join(groupBy + ["%hostname"])
122             groupBy = " GROUP BY " + ",".join(groupBy)
123         else:
124             groupBySub = " GROUP BY %hostname"
125             groupBy = ""
126
127         if orderBy:
128             orderBy = " ORDER BY " + ",".join(orderBy)
129         else:
130             orderBy = ""
131
132         if latest:
133             latestFields = ["table1.%s as %s" % (x,x) for x in srcFieldNames]
134             latestFields = ", ".join(latestFields)
135             tablePart = """(SELECT %s FROM %s AS table1
136                             JOIN
137                                 (SELECT %%hostname, event, max(time) as maxtime from %s GROUP BY %%hostname, event) AS latest
138                             ON
139                                 table1.%%hostname = latest.%%hostname AND table1.event = latest.event AND table1.time = latest.maxtime)""" % (latestFields, tablePart, tablePart)
140
141         if computed:
142             subQuery = "SELECT %%hostname, %s FROM %s" % (fields, tablePart)
143             if where:
144                 subQuery = subQuery + where
145             subQuery = subQuery + groupBySub
146
147             sumFields = []
148             for fieldName in fieldNames:
149                 if fieldName.startswith("avg"):
150                     sumFields.append("AVG(%s) as avg_%s"%(fieldName,fieldName))
151                     sumFields.append("MAX(%s) as max_%s"%(fieldName,fieldName))
152                 elif (fieldName.startswith("count")) or (fieldName.startswith("sum")) or (fieldName.startswith("computed")):
153                     sumFields.append("SUM(%s) as sum_%s"%(fieldName,fieldName))
154                 else:
155                     sumFields.append(fieldName)
156
157             sumFields = ",".join(sumFields)
158
159             query = "SELECT %s, %s FROM (%s)" % ("Time", sumFields, subQuery)
160             if groupBy:
161                 query = query + groupBy
162             if orderBy:
163                 query = query + orderBy
164         else:
165             query = "SELECT %s FROM %s" % (fields, tablePart)
166             if where:
167                 query = query + " " + where
168             if groupBy:
169                 query = query + groupBy
170             if orderBy:
171                 query = query + orderBy
172
173         return query
174
175     def get_list_from_req(self, req, name, default=[]):
176         value = req.GET.get(name, None)
177         if not value:
178             return default
179         value=value.replace("@","%")
180         return value.split(",")
181
182     def format_result(self, format, result, query, dataSourceUrl):
183         if not BIGQUERY_AVAILABLE:
184             msg = "BigQuery Statistics Unavaiable"
185         else:
186             msg = None
187
188         if (format == "json_dicts"):
189             result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl, "msg": msg}
190             return ("application/javascript", json.dumps(result))
191
192         elif (format == "json_arrays"):
193             new_result = []
194             for row in result:
195                 new_row = []
196                 for key in sorted(row.keys()):
197                     new_row.append(row[key])
198                 new_result.append(new_row)
199                 new_result = {"query": query, "rows": new_result, "msg": msg}
200             return ("application/javascript", json.dumps(new_result))
201
202         elif (format == "html_table"):
203             new_rows = []
204             for row in result:
205                 new_row = []
206                 for key in sorted(row.keys()):
207                     new_row.append("<TD>%s</TD>" % str(row[key]))
208                 new_rows.append("<TR>%s</TR>" % "".join(new_row))
209
210             new_result = "<TABLE>%s</TABLE>" % "\n".join(new_rows)
211
212             return ("text/html", new_result)
213
214     def merge_datamodel_sites(self, rows, slice=None):
215         """ For a query that included "site" in its groupby, merge in the
216             opencloud site information.
217         """
218
219         if slice:
220             try:
221                 slice = Slice.objects.get(name=slice)
222             except:
223                 slice = None
224
225         for row in rows:
226             sitename = row["site"]
227             try:
228                 model_site = Site.objects.get(name=sitename)
229             except:
230                 # we didn't find it in the data model
231                 continue
232
233             allocated_slivers = 0
234             if model_site and slice:
235                 for sliver in slice.slivers.all():
236                     if sliver.node.site == model_site:
237                         allocated_slivers = allocated_slivers + 1
238
239             row["lat"] = float(model_site.location.latitude)
240             row["long"] = float(model_site.location.longitude)
241             row["url"] = model_site.site_url
242             row["numNodes"] = model_site.nodes.count()
243             row["allocated_slivers"] = allocated_slivers
244
245             max_cpu = row.get("max_avg_cpu", row.get("max_cpu",0))
246             cpu=float(max_cpu)/100.0
247             row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD))
248
249     def compose_cached_query(self, querySpec='default'):
250         """ Compose a query that returns the 'most recent' row for each (hostname, event)
251             pair.
252
253             Note that groupByFields cannot contain any values that are 'Null' or those
254             rows will be excluded. For example, if groupByFields includes cp, then
255             there will be no libvirt_event rows, since libvirt_event does not have
256             cp.
257
258             This means we can't really have 'one query to rule them'. Settle on
259             having a couple of different queries, and have the caller specify
260             which one he wants.
261         """
262
263         fieldNames = ["%hostname", "%bytes_sent", "%bytes_hit", "%healthy", "time", "event", "%site", "%elapsed", "%cpu"]
264
265         if querySpec=="default":
266             groupByFields = ["%hostname", "event"]
267         elif (querySpec=="hpc"):
268             fieldNames.append("%cp")
269             groupByFields = ["%hostname", "event", "%cp"]
270         else:
271             raise ValueError("Unknown queryspec %s" % querySpec)
272
273         fields = ["table1.%s AS %s" % (x,x) for x in fieldNames]
274         fields = ", ".join(fields)
275
276         tableDesc = "%s.%s" % (self.projectName, self.tableName)
277
278         groupByOn = ["table1.time = latest.maxtime"]
279         for field in groupByFields:
280             groupByOn.append("table1.%s = latest.%s" % (field, field))
281
282         groupByOn = " AND ".join(groupByOn)
283         groupByFields = ", ".join(groupByFields)
284
285         base_query = "SELECT %s FROM [%s@-3600000--1] AS table1 JOIN (SELECT %s, max(time) as maxtime from [%s@-3600000--1] GROUP BY %s) AS latest ON %s" % \
286                       (fields, tableDesc, groupByFields, tableDesc, groupByFields, groupByOn)
287
288         return base_query
289
290     def get_cached_query_results(self, q, wait=True):
291         global glo_cached_queries
292
293         if q in glo_cached_queries:
294             if (time.time() - glo_cached_queries[q]["time"]) <= 60:
295                 print "using cached query"
296                 return glo_cached_queries[q]["rows"]
297
298         if not wait:
299             return None
300
301         print "refreshing cached query"
302         result = self.run_query(q)
303         glo_cached_queries[q] = {"time": time.time(), "rows": result}
304
305         return result
306
307     def process_request(self, req):
308         print req.GET
309
310         tqx = req.GET.get("tqx", None)
311
312         slice = req.GET.get("slice", None)
313         site = req.GET.get("site", None)
314         node = req.GET.get("node", None)
315         service = req.GET.get("service", None)
316         event = req.GET.get("event", "libvirt_heartbeat")
317         cp = req.GET.get("cp", None)
318
319         format = req.GET.get("format", "json_dicts")
320
321         timeBucket = int(req.GET.get("timeBucket", 60))
322         avg = self.get_list_from_req(req, "avg")
323         sum = self.get_list_from_req(req, "sum")
324         count = self.get_list_from_req(req, "count")
325         computed = self.get_list_from_req(req, "computed")
326         groupBy = self.get_list_from_req(req, "groupBy", ["Time"])
327         orderBy = self.get_list_from_req(req, "orderBy", ["Time"])
328
329         maxRows = req.GET.get("maxRows", None)
330         mergeDataModelSites = req.GET.get("mergeDataModelSites", None)
331
332         maxAge = int(req.GET.get("maxAge", 60*60))
333
334         cached = req.GET.get("cached", None)
335         cachedGroupBy = self.get_list_from_req(req, "cachedGroupBy", ["doesnotexist"])
336
337         filter={}
338         if slice:
339             filter["slice"] = slice
340         if site:
341             filter["site"] = site
342         if node:
343             filter["hostname"] = node
344         if event:
345             filter["event"] = event
346         if cp:
347             filter["cp"] = cp
348
349         q = self.compose_query(filter, timeBucket, avg, sum, count, computed, [], groupBy, orderBy, maxAge=maxAge)
350
351         print q
352
353         dataSourceUrl = "http://" + req.META["SERVER_NAME"] + ":" + req.META["SERVER_PORT"] + req.META["PATH_INFO"] + "?" + req.META["QUERY_STRING"].replace("format=","origFormat=").replace("%","%25") + "&format=charts";
354
355         if (format=="dataSourceUrl"):
356             result = {"dataSourceUrl": dataSourceUrl}
357             return ("application/javascript", result)
358
359         elif (format=="raw"):
360             result = self.run_query_raw(q)
361             result["dataSourceUrl"] = dataSourceUrl
362
363             result = json.dumps(result);
364
365             return ("application/javascript", result)
366
367         elif (format=="nodata"):
368             result = {"dataSourceUrl": dataSourceUrl, "query": q}
369             result = json.dumps(result);
370             return {"application/javascript", result}
371
372         elif (format=="charts"):
373             bq_result = self.run_query_raw(q)
374
375             # cloudscrutiny code is probably better!
376             table = {}
377             table["cols"] = self.schema_to_cols(bq_result["schema"])
378             rows = []
379             if "rows" in bq_result:
380                 for row in bq_result["rows"]:
381                     rowcols = []
382                     for (colnum,col) in enumerate(row["f"]):
383                         if (colnum==0):
384                             dt = datetime.datetime.fromtimestamp(float(col["v"]))
385                             rowcols.append({"v": 'new Date("%s")' % dt.isoformat()})
386                         else:
387                             try:
388                                 rowcols.append({"v": float(col["v"])})
389                             except:
390                                 rowcols.append({"v": col["v"]})
391                     rows.append({"c": rowcols})
392             table["rows"] = rows
393
394             if tqx:
395                 reqId = tqx.strip("reqId:")
396             else:
397                 reqId = "0"
398
399             result = {"status": "okColumnChart", "reqId": reqId, "table": table, "version": "0.6"}
400
401             result = "google.visualization.Query.setResponse(" + json.dumps(result) + ");"
402
403             def unquote_it(x): return x.group()[1:-1].replace('\\"', '"')
404
405             p = re.compile(r'"new Date\(\\"[^"]*\\"\)"')
406             result=p.sub(unquote_it, result)
407
408             return ("application/javascript", result)
409
410         else:
411             if cached:
412                 results = self.get_cached_query_results(self.compose_cached_query(cached))
413
414                 result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=cachedGroupBy)
415             else:
416                 result = self.run_query(q)
417
418             if maxRows:
419                 result = result[-int(maxRows):]
420
421             if mergeDataModelSites:
422                 self.merge_datamodel_sites(result)
423
424             return self.format_result(format, result, q, dataSourceUrl)
425
426 def DoPlanetStackAnalytics(request):
427     bq = PlanetStackAnalytics()
428     result = bq.process_request(request)
429
430     return result
431
432 def main():
433     bq = PlanetStackAnalytics(tableName="demoevents")
434
435     q = bq.compose_cached_query()
436     results = bq.run_query(q)
437
438     #results = bq.postprocess_results(results,
439     #                                 filter={"slice": "HyperCache"},
440     #                                 groupBy=["site"],
441     #                                 computed=["bytes_sent/elapsed"],
442     #                                 sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"],
443     #                                 maxDeltaTime=60)
444
445     #results = bq.postprocess_results(results, filter={"slice": "HyperCache"}, maxi=["cpu"], count=["hostname"], computed=["bytes_sent/elapsed"], groupBy=["Time", "site"], maxDeltaTime=80)
446
447     results = bq.postprocess_results(results,filter={"event": "libvirt_heartbeat"}, avg=["cpu"], count=["hostname"], groupBy=["doesnotexist"])
448
449     bq.dump_table(results)
450
451     sys.exit(0)
452
453     q=bq.compose_query(sum=["%bytes_sent"], avg=["%cpu"], latest=True, groupBy=["Time", "%site"])
454     print q
455     bq.dump_table(bq.run_query(q))
456
457     q=bq.compose_query(avg=["%cpu","%bandwidth"], count=["%hostname"], slice="HyperCache")
458     print q
459     bq.dump_table(bq.run_query(q))
460
461     q=bq.compose_query(computed=["%bytes_sent/%elapsed"])
462     print
463     print q
464     bq.dump_table(bq.run_query(q))
465
466     q=bq.compose_query(timeBucket=60*60, avg=["%cpu"], count=["%hostname"], computed=["%bytes_sent/%elapsed"])
467     print
468     print q
469     bq.dump_table(bq.run_query(q))
470
471 if __name__ == "__main__":
472     main()
473
474
475
476
477