1 from bigquery_analytics import BigQueryAnalytics
11 if os.path.exists("/home/smbaker/projects/vicci/plstackapi/planetstack"):
12 sys.path.append("/home/smbaker/projects/vicci/plstackapi/planetstack")
14 sys.path.append("/opt/planetstack")
16 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
17 from django.conf import settings
19 from django.db import connection
20 from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service
25 glo_cached_queries = {}
27 class PlanetStackAnalytics(BigQueryAnalytics):
28 def __init__(self, tableName=None):
30 tableName = settings.BIGQUERY_TABLE
32 BigQueryAnalytics.__init__(self, tableName)
34 def service_to_sliceNames(self, serviceName):
35 service=Service.objects.get(name=serviceName)
37 slices = service.slices.all()
39 # BUG in data model -- Slice.service has related name 'service' and
40 # it should be 'slices'
41 slices = service.service.all()
43 return [slice.name for slice in slices]
45 def compose_query(self, filter={}, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName=None, latest=False, maxAge=60*60):
47 tableName = self.tableName
49 maxAge = maxAge * 1000
50 tablePart = "[%s.%s@-%d--1]" % ("vicci", tableName, maxAge)
54 srcFieldNames = ["time"]
56 fields.append("SEC_TO_TIMESTAMP(INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s) as Time" % (str(timeBucket),str(timeBucket)))
57 #fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s as Time" % (str(timeBucket),str(timeBucket)))
60 fields.append("AVG(%s) as avg_%s" % (fieldName, fieldName.replace("%","")))
61 fieldNames.append("avg_%s" % fieldName.replace("%",""))
62 srcFieldNames.append(fieldName)
65 fields.append("SUM(%s) as sum_%s" % (fieldName, fieldName.replace("%","")))
66 fieldNames.append("sum_%s" % fieldName.replace("%",""))
67 srcFieldNames.append(fieldName)
69 for fieldName in count:
70 fields.append("COUNT(distinct %s) as count_%s" % (fieldName, fieldName.replace("%","")))
71 fieldNames.append("count_%s" % fieldName.replace("%",""))
72 srcFieldNames.append(fieldName)
75 fields.append(fieldName)
76 fieldNames.append(fieldName)
77 srcFieldNames.append(fieldName)
79 for fieldName in computed:
81 parts = fieldName.split("/")
82 computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
85 parts = computed.split("*")
86 computedFieldName = "computed_" + parts[0].replace("%","")+"_mult_"+parts[1].replace("%","")
87 fields.append("SUM(%s)%sSUM(%s) as %s" % (parts[0], operator, parts[1], computedFieldName))
88 fieldNames.append(computedFieldName)
89 srcFieldNames.append(parts[0])
90 srcFieldNames.append(parts[1])
92 for fieldName in groupBy:
93 if (fieldName not in ["Time"]):
94 fields.append(fieldName)
95 fieldNames.append(fieldName)
96 srcFieldNames.append(fieldName)
98 fields = ", ".join(fields)
102 if filter.get("slice",None):
103 where.append("%%slice='%s'" % filter["slice"])
104 if filter.get("site",None):
105 where.append("%%site='%s'" % filter["site"])
106 if filter.get("node",None):
107 where.append("%%hostname='%s'" % filter["node"])
108 if filter.get("event",None):
109 where.append("event='%s'" % filter["event"])
110 if filter.get("service",None):
111 sliceNames = self.service_to_sliceNames(filter["service"])
113 where.append("(" + " OR ".join(["%%slice='%s'" % sliceName for sliceName in sliceNames]) +")")
116 where = " WHERE " + " AND ".join(where)
121 groupBySub = " GROUP BY " + ",".join(groupBy + ["%hostname"])
122 groupBy = " GROUP BY " + ",".join(groupBy)
124 groupBySub = " GROUP BY %hostname"
128 orderBy = " ORDER BY " + ",".join(orderBy)
133 latestFields = ["table1.%s as %s" % (x,x) for x in srcFieldNames]
134 latestFields = ", ".join(latestFields)
135 tablePart = """(SELECT %s FROM %s AS table1
137 (SELECT %%hostname, event, max(time) as maxtime from %s GROUP BY %%hostname, event) AS latest
139 table1.%%hostname = latest.%%hostname AND table1.event = latest.event AND table1.time = latest.maxtime)""" % (latestFields, tablePart, tablePart)
142 subQuery = "SELECT %%hostname, %s FROM %s" % (fields, tablePart)
144 subQuery = subQuery + where
145 subQuery = subQuery + groupBySub
148 for fieldName in fieldNames:
149 if fieldName.startswith("avg"):
150 sumFields.append("AVG(%s) as avg_%s"%(fieldName,fieldName))
151 sumFields.append("MAX(%s) as max_%s"%(fieldName,fieldName))
152 elif (fieldName.startswith("count")) or (fieldName.startswith("sum")) or (fieldName.startswith("computed")):
153 sumFields.append("SUM(%s) as sum_%s"%(fieldName,fieldName))
155 sumFields.append(fieldName)
157 sumFields = ",".join(sumFields)
159 query = "SELECT %s, %s FROM (%s)" % ("Time", sumFields, subQuery)
161 query = query + groupBy
163 query = query + orderBy
165 query = "SELECT %s FROM %s" % (fields, tablePart)
167 query = query + " " + where
169 query = query + groupBy
171 query = query + orderBy
175 def get_list_from_req(self, req, name, default=[]):
176 value = req.GET.get(name, None)
179 value=value.replace("@","%")
180 return value.split(",")
182 def format_result(self, format, result, query, dataSourceUrl):
183 if (format == "json_dicts"):
184 result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl}
185 return ("application/javascript", json.dumps(result))
187 elif (format == "json_arrays"):
191 for key in sorted(row.keys()):
192 new_row.append(row[key])
193 new_result.append(new_row)
194 new_result = {"query": query, "rows": new_result}
195 return ("application/javascript", json.dumps(new_result))
197 elif (format == "html_table"):
201 for key in sorted(row.keys()):
202 new_row.append("<TD>%s</TD>" % str(row[key]))
203 new_rows.append("<TR>%s</TR>" % "".join(new_row))
205 new_result = "<TABLE>%s</TABLE>" % "\n".join(new_rows)
207 return ("text/html", new_result)
209 def merge_datamodel_sites(self, rows, slice=None):
210 """ For a query that included "site" in its groupby, merge in the
211 opencloud site information.
216 slice = Slice.objects.get(name=slice)
221 sitename = row["site"]
223 model_site = Site.objects.get(name=sitename)
225 # we didn't find it in the data model
228 allocated_slivers = 0
229 if model_site and slice:
230 for sliver in slice.slivers.all():
231 if sliver.node.site == model_site:
232 allocated_slivers = allocated_slivers + 1
234 row["lat"] = float(model_site.location.latitude)
235 row["long"] = float(model_site.location.longitude)
236 row["url"] = model_site.site_url
237 row["numNodes"] = model_site.nodes.count()
238 row["allocated_slivers"] = allocated_slivers
240 max_cpu = row.get("max_avg_cpu", row.get("max_cpu",0))
241 cpu=float(max_cpu)/100.0
242 row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD))
244 def compose_cached_query(self, querySpec='default'):
245 """ Compose a query that returns the 'most recent' row for each (hostname, event)
248 Note that groupByFields cannot contain any values that are 'Null' or those
249 rows will be excluded. For example, if groupByFields includes cp, then
250 there will be no libvirt_event rows, since libvirt_event does not have
253 This means we can't really have 'one query to rule them'. Settle on
254 having a couple of different queries, and have the caller specify
258 fieldNames = ["%hostname", "%bytes_sent", "%bytes_hit", "%healthy", "time", "event", "%site", "%elapsed", "%cpu"]
260 if querySpec=="default":
261 groupByFields = ["%hostname", "event"]
262 elif (querySpec=="hpc"):
263 fieldNames.append("%cp")
264 groupByFields = ["%hostname", "event", "%cp"]
266 raise ValueError("Unknown queryspec %s" % querySpec)
268 fields = ["table1.%s AS %s" % (x,x) for x in fieldNames]
269 fields = ", ".join(fields)
271 tableDesc = "%s.%s" % (self.projectName, self.tableName)
273 groupByOn = ["table1.time = latest.maxtime"]
274 for field in groupByFields:
275 groupByOn.append("table1.%s = latest.%s" % (field, field))
277 groupByOn = " AND ".join(groupByOn)
278 groupByFields = ", ".join(groupByFields)
280 base_query = "SELECT %s FROM [%s@-3600000--1] AS table1 JOIN (SELECT %s, max(time) as maxtime from [%s@-3600000--1] GROUP BY %s) AS latest ON %s" % \
281 (fields, tableDesc, groupByFields, tableDesc, groupByFields, groupByOn)
285 def get_cached_query_results(self, q, wait=True):
286 global glo_cached_queries
288 if q in glo_cached_queries:
289 if (time.time() - glo_cached_queries[q]["time"]) <= 60:
290 print "using cached query"
291 return glo_cached_queries[q]["rows"]
296 print "refreshing cached query"
297 result = self.run_query(q)
298 glo_cached_queries[q] = {"time": time.time(), "rows": result}
302 def process_request(self, req):
305 tqx = req.GET.get("tqx", None)
307 slice = req.GET.get("slice", None)
308 site = req.GET.get("site", None)
309 node = req.GET.get("node", None)
310 service = req.GET.get("service", None)
311 event = req.GET.get("event", "libvirt_heartbeat")
312 cp = req.GET.get("cp", None)
314 format = req.GET.get("format", "json_dicts")
316 timeBucket = int(req.GET.get("timeBucket", 60))
317 avg = self.get_list_from_req(req, "avg")
318 sum = self.get_list_from_req(req, "sum")
319 count = self.get_list_from_req(req, "count")
320 computed = self.get_list_from_req(req, "computed")
321 groupBy = self.get_list_from_req(req, "groupBy", ["Time"])
322 orderBy = self.get_list_from_req(req, "orderBy", ["Time"])
324 maxRows = req.GET.get("maxRows", None)
325 mergeDataModelSites = req.GET.get("mergeDataModelSites", None)
327 maxAge = int(req.GET.get("maxAge", 60*60))
329 cached = req.GET.get("cached", None)
330 cachedGroupBy = self.get_list_from_req(req, "cachedGroupBy", ["doesnotexist"])
334 filter["slice"] = slice
336 filter["site"] = site
338 filter["hostname"] = node
340 filter["event"] = event
344 q = self.compose_query(filter, timeBucket, avg, sum, count, computed, [], groupBy, orderBy, maxAge=maxAge)
348 dataSourceUrl = "http://" + req.META["SERVER_NAME"] + ":" + req.META["SERVER_PORT"] + req.META["PATH_INFO"] + "?" + req.META["QUERY_STRING"].replace("format=","origFormat=").replace("%","%25") + "&format=charts";
350 if (format=="dataSourceUrl"):
351 result = {"dataSourceUrl": dataSourceUrl}
352 return ("application/javascript", result)
354 elif (format=="raw"):
355 result = self.run_query_raw(q)
356 result["dataSourceUrl"] = dataSourceUrl
358 result = json.dumps(result);
360 return ("application/javascript", result)
362 elif (format=="nodata"):
363 result = {"dataSourceUrl": dataSourceUrl, "query": q}
364 result = json.dumps(result);
365 return {"application/javascript", result}
367 elif (format=="charts"):
368 bq_result = self.run_query_raw(q)
370 # cloudscrutiny code is probably better!
372 table["cols"] = self.schema_to_cols(bq_result["schema"])
374 if "rows" in bq_result:
375 for row in bq_result["rows"]:
377 for (colnum,col) in enumerate(row["f"]):
379 dt = datetime.datetime.fromtimestamp(float(col["v"]))
380 rowcols.append({"v": 'new Date("%s")' % dt.isoformat()})
383 rowcols.append({"v": float(col["v"])})
385 rowcols.append({"v": col["v"]})
386 rows.append({"c": rowcols})
390 reqId = tqx.strip("reqId:")
394 result = {"status": "okColumnChart", "reqId": reqId, "table": table, "version": "0.6"}
396 result = "google.visualization.Query.setResponse(" + json.dumps(result) + ");"
398 def unquote_it(x): return x.group()[1:-1].replace('\\"', '"')
400 p = re.compile(r'"new Date\(\\"[^"]*\\"\)"')
401 result=p.sub(unquote_it, result)
403 return ("application/javascript", result)
407 results = self.get_cached_query_results(self.compose_cached_query(cached))
409 result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=cachedGroupBy)
411 result = self.run_query(q)
414 result = result[-int(maxRows):]
416 if mergeDataModelSites:
417 self.merge_datamodel_sites(result)
419 return self.format_result(format, result, q, dataSourceUrl)
421 def DoPlanetStackAnalytics(request):
422 bq = PlanetStackAnalytics()
423 result = bq.process_request(request)
428 bq = PlanetStackAnalytics(tableName="demoevents")
430 q = bq.compose_cached_query()
431 results = bq.run_query(q)
433 #results = bq.postprocess_results(results,
434 # filter={"slice": "HyperCache"},
436 # computed=["bytes_sent/elapsed"],
437 # sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"],
440 #results = bq.postprocess_results(results, filter={"slice": "HyperCache"}, maxi=["cpu"], count=["hostname"], computed=["bytes_sent/elapsed"], groupBy=["Time", "site"], maxDeltaTime=80)
442 results = bq.postprocess_results(results,filter={"event": "libvirt_heartbeat"}, avg=["cpu"], count=["hostname"], groupBy=["doesnotexist"])
444 bq.dump_table(results)
448 q=bq.compose_query(sum=["%bytes_sent"], avg=["%cpu"], latest=True, groupBy=["Time", "%site"])
450 bq.dump_table(bq.run_query(q))
452 q=bq.compose_query(avg=["%cpu","%bandwidth"], count=["%hostname"], slice="HyperCache")
454 bq.dump_table(bq.run_query(q))
456 q=bq.compose_query(computed=["%bytes_sent/%elapsed"])
459 bq.dump_table(bq.run_query(q))
461 q=bq.compose_query(timeBucket=60*60, avg=["%cpu"], count=["%hostname"], computed=["%bytes_sent/%elapsed"])
464 bq.dump_table(bq.run_query(q))
466 if __name__ == "__main__":