1 from bigquery_analytics import BigQueryAnalytics, BIGQUERY_AVAILABLE
11 if os.path.exists("/home/smbaker/projects/vicci/plstackapi/planetstack"):
12 sys.path.append("/home/smbaker/projects/vicci/plstackapi/planetstack")
14 sys.path.append("/opt/planetstack")
16 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
17 from django.conf import settings
19 from django.db import connection
20 from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service
25 glo_cached_queries = {}
27 class PlanetStackAnalytics(BigQueryAnalytics):
28 def __init__(self, tableName=None):
30 tableName = settings.BIGQUERY_TABLE
32 BigQueryAnalytics.__init__(self, tableName)
34 def service_to_sliceNames(self, serviceName):
35 service=Service.objects.get(name=serviceName)
37 slices = service.slices.all()
39 # BUG in data model -- Slice.service has related name 'service' and
40 # it should be 'slices'
41 slices = service.service.all()
43 return [slice.name for slice in slices]
45 def compose_query(self, filter={}, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName=None, latest=False, maxAge=60*60):
47 tableName = self.tableName
49 maxAge = maxAge * 1000
50 tablePart = "[%s.%s@-%d--1]" % ("vicci", tableName, maxAge)
54 srcFieldNames = ["time"]
56 fields.append("SEC_TO_TIMESTAMP(INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s) as Time" % (str(timeBucket),str(timeBucket)))
57 #fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s as Time" % (str(timeBucket),str(timeBucket)))
60 fields.append("AVG(%s) as avg_%s" % (fieldName, fieldName.replace("%","")))
61 fieldNames.append("avg_%s" % fieldName.replace("%",""))
62 srcFieldNames.append(fieldName)
65 fields.append("SUM(%s) as sum_%s" % (fieldName, fieldName.replace("%","")))
66 fieldNames.append("sum_%s" % fieldName.replace("%",""))
67 srcFieldNames.append(fieldName)
69 for fieldName in count:
70 fields.append("COUNT(distinct %s) as count_%s" % (fieldName, fieldName.replace("%","")))
71 fieldNames.append("count_%s" % fieldName.replace("%",""))
72 srcFieldNames.append(fieldName)
75 fields.append(fieldName)
76 fieldNames.append(fieldName)
77 srcFieldNames.append(fieldName)
79 for fieldName in computed:
81 parts = fieldName.split("/")
82 computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
85 parts = computed.split("*")
86 computedFieldName = "computed_" + parts[0].replace("%","")+"_mult_"+parts[1].replace("%","")
87 fields.append("SUM(%s)%sSUM(%s) as %s" % (parts[0], operator, parts[1], computedFieldName))
88 fieldNames.append(computedFieldName)
89 srcFieldNames.append(parts[0])
90 srcFieldNames.append(parts[1])
92 for fieldName in groupBy:
93 if (fieldName not in ["Time"]):
94 fields.append(fieldName)
95 fieldNames.append(fieldName)
96 srcFieldNames.append(fieldName)
98 fields = ", ".join(fields)
102 if filter.get("slice",None):
103 where.append("%%slice='%s'" % filter["slice"])
104 if filter.get("site",None):
105 where.append("%%site='%s'" % filter["site"])
106 if filter.get("node",None):
107 where.append("%%hostname='%s'" % filter["node"])
108 if filter.get("event",None):
109 where.append("event='%s'" % filter["event"])
110 if filter.get("service",None):
111 sliceNames = self.service_to_sliceNames(filter["service"])
113 where.append("(" + " OR ".join(["%%slice='%s'" % sliceName for sliceName in sliceNames]) +")")
116 where = " WHERE " + " AND ".join(where)
121 groupBySub = " GROUP BY " + ",".join(groupBy + ["%hostname"])
122 groupBy = " GROUP BY " + ",".join(groupBy)
124 groupBySub = " GROUP BY %hostname"
128 orderBy = " ORDER BY " + ",".join(orderBy)
133 latestFields = ["table1.%s as %s" % (x,x) for x in srcFieldNames]
134 latestFields = ", ".join(latestFields)
135 tablePart = """(SELECT %s FROM %s AS table1
137 (SELECT %%hostname, event, max(time) as maxtime from %s GROUP BY %%hostname, event) AS latest
139 table1.%%hostname = latest.%%hostname AND table1.event = latest.event AND table1.time = latest.maxtime)""" % (latestFields, tablePart, tablePart)
142 subQuery = "SELECT %%hostname, %s FROM %s" % (fields, tablePart)
144 subQuery = subQuery + where
145 subQuery = subQuery + groupBySub
148 for fieldName in fieldNames:
149 if fieldName.startswith("avg"):
150 sumFields.append("AVG(%s) as avg_%s"%(fieldName,fieldName))
151 sumFields.append("MAX(%s) as max_%s"%(fieldName,fieldName))
152 elif (fieldName.startswith("count")) or (fieldName.startswith("sum")) or (fieldName.startswith("computed")):
153 sumFields.append("SUM(%s) as sum_%s"%(fieldName,fieldName))
155 sumFields.append(fieldName)
157 sumFields = ",".join(sumFields)
159 query = "SELECT %s, %s FROM (%s)" % ("Time", sumFields, subQuery)
161 query = query + groupBy
163 query = query + orderBy
165 query = "SELECT %s FROM %s" % (fields, tablePart)
167 query = query + " " + where
169 query = query + groupBy
171 query = query + orderBy
175 def get_list_from_req(self, req, name, default=[]):
176 value = req.GET.get(name, None)
179 value=value.replace("@","%")
180 return value.split(",")
182 def format_result(self, format, result, query, dataSourceUrl):
183 if not BIGQUERY_AVAILABLE:
184 msg = "BigQuery Statistics Unavaiable"
188 if (format == "json_dicts"):
189 result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl, "msg": msg}
190 return ("application/javascript", json.dumps(result))
192 elif (format == "json_arrays"):
196 for key in sorted(row.keys()):
197 new_row.append(row[key])
198 new_result.append(new_row)
199 new_result = {"query": query, "rows": new_result, "msg": msg}
200 return ("application/javascript", json.dumps(new_result))
202 elif (format == "html_table"):
206 for key in sorted(row.keys()):
207 new_row.append("<TD>%s</TD>" % str(row[key]))
208 new_rows.append("<TR>%s</TR>" % "".join(new_row))
210 new_result = "<TABLE>%s</TABLE>" % "\n".join(new_rows)
212 return ("text/html", new_result)
214 def merge_datamodel_sites(self, rows, slice=None):
215 """ For a query that included "site" in its groupby, merge in the
216 opencloud site information.
221 slice = Slice.objects.get(name=slice)
226 sitename = row["site"]
228 model_site = Site.objects.get(name=sitename)
230 # we didn't find it in the data model
233 allocated_slivers = 0
234 if model_site and slice:
235 for sliver in slice.slivers.all():
236 if sliver.node.site == model_site:
237 allocated_slivers = allocated_slivers + 1
239 row["lat"] = float(model_site.location.latitude)
240 row["long"] = float(model_site.location.longitude)
241 row["url"] = model_site.site_url
242 row["numNodes"] = model_site.nodes.count()
243 row["allocated_slivers"] = allocated_slivers
245 max_cpu = row.get("max_avg_cpu", row.get("max_cpu",0))
246 cpu=float(max_cpu)/100.0
247 row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD))
249 def compose_cached_query(self, querySpec='default'):
250 """ Compose a query that returns the 'most recent' row for each (hostname, event)
253 Note that groupByFields cannot contain any values that are 'Null' or those
254 rows will be excluded. For example, if groupByFields includes cp, then
255 there will be no libvirt_event rows, since libvirt_event does not have
258 This means we can't really have 'one query to rule them'. Settle on
259 having a couple of different queries, and have the caller specify
263 fieldNames = ["%hostname", "%bytes_sent", "%bytes_hit", "%healthy", "time", "event", "%site", "%elapsed", "%cpu"]
265 if querySpec=="default":
266 groupByFields = ["%hostname", "event"]
267 elif (querySpec=="hpc"):
268 fieldNames.append("%cp")
269 groupByFields = ["%hostname", "event", "%cp"]
271 raise ValueError("Unknown queryspec %s" % querySpec)
273 fields = ["table1.%s AS %s" % (x,x) for x in fieldNames]
274 fields = ", ".join(fields)
276 tableDesc = "%s.%s" % (self.projectName, self.tableName)
278 groupByOn = ["table1.time = latest.maxtime"]
279 for field in groupByFields:
280 groupByOn.append("table1.%s = latest.%s" % (field, field))
282 groupByOn = " AND ".join(groupByOn)
283 groupByFields = ", ".join(groupByFields)
285 base_query = "SELECT %s FROM [%s@-3600000--1] AS table1 JOIN (SELECT %s, max(time) as maxtime from [%s@-3600000--1] GROUP BY %s) AS latest ON %s" % \
286 (fields, tableDesc, groupByFields, tableDesc, groupByFields, groupByOn)
290 def get_cached_query_results(self, q, wait=True):
291 global glo_cached_queries
293 if q in glo_cached_queries:
294 if (time.time() - glo_cached_queries[q]["time"]) <= 60:
295 print "using cached query"
296 return glo_cached_queries[q]["rows"]
301 print "refreshing cached query"
302 result = self.run_query(q)
303 glo_cached_queries[q] = {"time": time.time(), "rows": result}
307 def process_request(self, req):
310 tqx = req.GET.get("tqx", None)
312 slice = req.GET.get("slice", None)
313 site = req.GET.get("site", None)
314 node = req.GET.get("node", None)
315 service = req.GET.get("service", None)
316 event = req.GET.get("event", "libvirt_heartbeat")
317 cp = req.GET.get("cp", None)
319 format = req.GET.get("format", "json_dicts")
321 timeBucket = int(req.GET.get("timeBucket", 60))
322 avg = self.get_list_from_req(req, "avg")
323 sum = self.get_list_from_req(req, "sum")
324 count = self.get_list_from_req(req, "count")
325 computed = self.get_list_from_req(req, "computed")
326 groupBy = self.get_list_from_req(req, "groupBy", ["Time"])
327 orderBy = self.get_list_from_req(req, "orderBy", ["Time"])
329 maxRows = req.GET.get("maxRows", None)
330 mergeDataModelSites = req.GET.get("mergeDataModelSites", None)
332 maxAge = int(req.GET.get("maxAge", 60*60))
334 cached = req.GET.get("cached", None)
335 cachedGroupBy = self.get_list_from_req(req, "cachedGroupBy", ["doesnotexist"])
339 filter["slice"] = slice
341 filter["site"] = site
343 filter["hostname"] = node
345 filter["event"] = event
349 q = self.compose_query(filter, timeBucket, avg, sum, count, computed, [], groupBy, orderBy, maxAge=maxAge)
353 dataSourceUrl = "http://" + req.META["SERVER_NAME"] + ":" + req.META["SERVER_PORT"] + req.META["PATH_INFO"] + "?" + req.META["QUERY_STRING"].replace("format=","origFormat=").replace("%","%25") + "&format=charts";
355 if (format=="dataSourceUrl"):
356 result = {"dataSourceUrl": dataSourceUrl}
357 return ("application/javascript", result)
359 elif (format=="raw"):
360 result = self.run_query_raw(q)
361 result["dataSourceUrl"] = dataSourceUrl
363 result = json.dumps(result);
365 return ("application/javascript", result)
367 elif (format=="nodata"):
368 result = {"dataSourceUrl": dataSourceUrl, "query": q}
369 result = json.dumps(result);
370 return {"application/javascript", result}
372 elif (format=="charts"):
373 bq_result = self.run_query_raw(q)
375 # cloudscrutiny code is probably better!
377 table["cols"] = self.schema_to_cols(bq_result["schema"])
379 if "rows" in bq_result:
380 for row in bq_result["rows"]:
382 for (colnum,col) in enumerate(row["f"]):
384 dt = datetime.datetime.fromtimestamp(float(col["v"]))
385 rowcols.append({"v": 'new Date("%s")' % dt.isoformat()})
388 rowcols.append({"v": float(col["v"])})
390 rowcols.append({"v": col["v"]})
391 rows.append({"c": rowcols})
395 reqId = tqx.strip("reqId:")
399 result = {"status": "okColumnChart", "reqId": reqId, "table": table, "version": "0.6"}
401 result = "google.visualization.Query.setResponse(" + json.dumps(result) + ");"
403 def unquote_it(x): return x.group()[1:-1].replace('\\"', '"')
405 p = re.compile(r'"new Date\(\\"[^"]*\\"\)"')
406 result=p.sub(unquote_it, result)
408 return ("application/javascript", result)
412 results = self.get_cached_query_results(self.compose_cached_query(cached))
414 result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=cachedGroupBy)
416 result = self.run_query(q)
419 result = result[-int(maxRows):]
421 if mergeDataModelSites:
422 self.merge_datamodel_sites(result)
424 return self.format_result(format, result, q, dataSourceUrl)
426 def DoPlanetStackAnalytics(request):
427 bq = PlanetStackAnalytics()
428 result = bq.process_request(request)
433 bq = PlanetStackAnalytics(tableName="demoevents")
435 q = bq.compose_cached_query()
436 results = bq.run_query(q)
438 #results = bq.postprocess_results(results,
439 # filter={"slice": "HyperCache"},
441 # computed=["bytes_sent/elapsed"],
442 # sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"],
445 #results = bq.postprocess_results(results, filter={"slice": "HyperCache"}, maxi=["cpu"], count=["hostname"], computed=["bytes_sent/elapsed"], groupBy=["Time", "site"], maxDeltaTime=80)
447 results = bq.postprocess_results(results,filter={"event": "libvirt_heartbeat"}, avg=["cpu"], count=["hostname"], groupBy=["doesnotexist"])
449 bq.dump_table(results)
453 q=bq.compose_query(sum=["%bytes_sent"], avg=["%cpu"], latest=True, groupBy=["Time", "%site"])
455 bq.dump_table(bq.run_query(q))
457 q=bq.compose_query(avg=["%cpu","%bandwidth"], count=["%hostname"], slice="HyperCache")
459 bq.dump_table(bq.run_query(q))
461 q=bq.compose_query(computed=["%bytes_sent/%elapsed"])
464 bq.dump_table(bq.run_query(q))
466 q=bq.compose_query(timeBucket=60*60, avg=["%cpu"], count=["%hostname"], computed=["%bytes_sent/%elapsed"])
469 bq.dump_table(bq.run_query(q))
471 if __name__ == "__main__":