X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=planetstack%2Fhpc_wizard%2Fplanetstack_analytics.py;h=75462d4eb029b27fae4ceecacf7cdce97e3db7c6;hb=f2c26de209b613662ad6410dd1b7afdc1ec6ca14;hp=d567e22e1886fb03dfed31f1a230a088a5a8f032;hpb=78ab1011ba7c67b793fa055be1bac50668165d34;p=plstackapi.git diff --git a/planetstack/hpc_wizard/planetstack_analytics.py b/planetstack/hpc_wizard/planetstack_analytics.py index d567e22..75462d4 100644 --- a/planetstack/hpc_wizard/planetstack_analytics.py +++ b/planetstack/hpc_wizard/planetstack_analytics.py @@ -1,35 +1,80 @@ -from bigquery_analytics import BigQueryAnalytics +from bigquery_analytics import BigQueryAnalytics, BIGQUERY_AVAILABLE +import datetime +import re +import os +import sys +import time import json import traceback +import urllib2 + +if os.path.exists("/home/smbaker/projects/vicci/plstackapi/planetstack"): + sys.path.append("/home/smbaker/projects/vicci/plstackapi/planetstack") +else: + sys.path.append("/opt/planetstack") + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings") +from django.conf import settings +from django import db +from django.db import connection +from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service + +BLUE_LOAD=5000000 +RED_LOAD=15000000 + +glo_cached_queries = {} class PlanetStackAnalytics(BigQueryAnalytics): - def __init__(self, tableName="demoevents"): + def __init__(self, tableName=None): + if not tableName: + tableName = settings.BIGQUERY_TABLE + BigQueryAnalytics.__init__(self, tableName) - def compose_query(self, slice=None, site=None, node=None, timeField="MinuteTime", avg=[], sum=[], count=[], computed=[], groupBy=["MinuteTime"], orderBy=["MinuteTime"], tableName="demoevents"): - tablePart = "%s.%s@-3600000--1" % ("vicci", tableName) + def service_to_sliceNames(self, serviceName): + service=Service.objects.get(name=serviceName) + try: + slices = service.slices.all() + except: + # BUG in data model -- Slice.service has related name 'service' and + # it should be 'slices' + slices = service.service.all() + + return [slice.name for slice in slices] + + def compose_query(self, filter={}, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName=None, latest=False, maxAge=60*60): + if tableName is None: + tableName = self.tableName + + maxAge = maxAge * 1000 + tablePart = "[%s.%s@-%d--1]" % ("vicci", tableName, maxAge) fields = [] fieldNames = [] + srcFieldNames = ["time"] - if (timeField=="MinuteTime"): - fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/60)*60 as MinuteTime") - elif (timeField=="HourTime"): - fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/60/60)*60*60 as HourTime") - elif (timeField=="DayTime"): - fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/60/60/24)*60*60*24 as DayTime") + fields.append("SEC_TO_TIMESTAMP(INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s) as Time" % (str(timeBucket),str(timeBucket))) + #fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s as Time" % (str(timeBucket),str(timeBucket))) for fieldName in avg: fields.append("AVG(%s) as avg_%s" % (fieldName, fieldName.replace("%",""))) fieldNames.append("avg_%s" % fieldName.replace("%","")) + srcFieldNames.append(fieldName) for fieldName in sum: fields.append("SUM(%s) as sum_%s" % (fieldName, fieldName.replace("%",""))) fieldNames.append("sum_%s" % fieldName.replace("%","")) + srcFieldNames.append(fieldName) for fieldName in count: fields.append("COUNT(distinct %s) as count_%s" % (fieldName, fieldName.replace("%",""))) fieldNames.append("count_%s" % fieldName.replace("%","")) + srcFieldNames.append(fieldName) + + for fieldName in val: + fields.append(fieldName) + fieldNames.append(fieldName) + srcFieldNames.append(fieldName) for fieldName in computed: operator = "/" @@ -41,17 +86,31 @@ class PlanetStackAnalytics(BigQueryAnalytics): computedFieldName = "computed_" + parts[0].replace("%","")+"_mult_"+parts[1].replace("%","") fields.append("SUM(%s)%sSUM(%s) as %s" % (parts[0], operator, parts[1], computedFieldName)) fieldNames.append(computedFieldName) + srcFieldNames.append(parts[0]) + srcFieldNames.append(parts[1]) + + for fieldName in groupBy: + if (fieldName not in ["Time"]): + fields.append(fieldName) + fieldNames.append(fieldName) + srcFieldNames.append(fieldName) fields = ", ".join(fields) where = [] - if slice: - where.append("%%slice='%s'" % slice) - if site: - where.append("%%site='%s'" % site) - if node: - where.append("%%hostname='%s'" % node) + if filter.get("slice",None): + where.append("%%slice='%s'" % filter["slice"]) + if filter.get("site",None): + where.append("%%site='%s'" % filter["site"]) + if filter.get("node",None): + where.append("%%hostname='%s'" % filter["node"]) + if filter.get("event",None): + where.append("event='%s'" % filter["event"]) + if filter.get("service",None): + sliceNames = self.service_to_sliceNames(filter["service"]) + if sliceNames: + where.append("(" + " OR ".join(["%%slice='%s'" % sliceName for sliceName in sliceNames]) +")") if where: where = " WHERE " + " AND ".join(where) @@ -59,8 +118,10 @@ class PlanetStackAnalytics(BigQueryAnalytics): where ="" if groupBy: + groupBySub = " GROUP BY " + ",".join(groupBy + ["%hostname"]) groupBy = " GROUP BY " + ",".join(groupBy) else: + groupBySub = " GROUP BY %hostname" groupBy = "" if orderBy: @@ -68,28 +129,40 @@ class PlanetStackAnalytics(BigQueryAnalytics): else: orderBy = "" + if latest: + latestFields = ["table1.%s as %s" % (x,x) for x in srcFieldNames] + latestFields = ", ".join(latestFields) + tablePart = """(SELECT %s FROM %s AS table1 + JOIN + (SELECT %%hostname, event, max(time) as maxtime from %s GROUP BY %%hostname, event) AS latest + ON + table1.%%hostname = latest.%%hostname AND table1.event = latest.event AND table1.time = latest.maxtime)""" % (latestFields, tablePart, tablePart) + if computed: - subQuery = "SELECT %%hostname, %s FROM [%s]" % (fields, tablePart) + subQuery = "SELECT %%hostname, %s FROM %s" % (fields, tablePart) if where: subQuery = subQuery + where - subQuery = subQuery + " GROUP BY %s,%%hostname" % timeField + subQuery = subQuery + groupBySub sumFields = [] for fieldName in fieldNames: if fieldName.startswith("avg"): sumFields.append("AVG(%s) as avg_%s"%(fieldName,fieldName)) - else: + sumFields.append("MAX(%s) as max_%s"%(fieldName,fieldName)) + elif (fieldName.startswith("count")) or (fieldName.startswith("sum")) or (fieldName.startswith("computed")): sumFields.append("SUM(%s) as sum_%s"%(fieldName,fieldName)) + else: + sumFields.append(fieldName) sumFields = ",".join(sumFields) - query = "SELECT %s, %s FROM (%s)" % (timeField, sumFields, subQuery) + query = "SELECT %s, %s FROM (%s)" % ("Time", sumFields, subQuery) if groupBy: query = query + groupBy if orderBy: query = query + orderBy else: - query = "SELECT %s FROM [%s]" % (fields, tablePart) + query = "SELECT %s FROM %s" % (fields, tablePart) if where: query = query + " " + where if groupBy: @@ -103,11 +176,17 @@ class PlanetStackAnalytics(BigQueryAnalytics): value = req.GET.get(name, None) if not value: return default + value=value.replace("@","%") return value.split(",") - def format_result(self, format, result, query): + def format_result(self, format, result, query, dataSourceUrl): + if not BIGQUERY_AVAILABLE: + msg = "BigQuery Statistics Unavaiable" + else: + msg = None + if (format == "json_dicts"): - result = {"query": query, "rows": result} + result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl, "msg": msg} return ("application/javascript", json.dumps(result)) elif (format == "json_arrays"): @@ -117,7 +196,7 @@ class PlanetStackAnalytics(BigQueryAnalytics): for key in sorted(row.keys()): new_row.append(row[key]) new_result.append(new_row) - new_result = {"query": query, "rows": new_result} + new_result = {"query": query, "rows": new_result, "msg": msg} return ("application/javascript", json.dumps(new_result)) elif (format == "html_table"): @@ -132,43 +211,217 @@ class PlanetStackAnalytics(BigQueryAnalytics): return ("text/html", new_result) + def merge_datamodel_sites(self, rows, slice=None): + """ For a query that included "site" in its groupby, merge in the + opencloud site information. + """ + + if slice: + try: + slice = Slice.objects.get(name=slice) + except: + slice = None + + for row in rows: + sitename = row["site"] + try: + model_site = Site.objects.get(name=sitename) + except: + # we didn't find it in the data model + continue + + allocated_slivers = 0 + if model_site and slice: + for sliver in slice.slivers.all(): + if sliver.node.site == model_site: + allocated_slivers = allocated_slivers + 1 + + row["lat"] = float(model_site.location.latitude) + row["long"] = float(model_site.location.longitude) + row["url"] = model_site.site_url + row["numNodes"] = model_site.nodes.count() + row["allocated_slivers"] = allocated_slivers + + max_cpu = row.get("max_avg_cpu", row.get("max_cpu",0)) + cpu=float(max_cpu)/100.0 + row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD)) + + def compose_cached_query(self, querySpec='default'): + """ Compose a query that returns the 'most recent' row for each (hostname, event) + pair. + + Note that groupByFields cannot contain any values that are 'Null' or those + rows will be excluded. For example, if groupByFields includes cp, then + there will be no libvirt_event rows, since libvirt_event does not have + cp. + + This means we can't really have 'one query to rule them'. Settle on + having a couple of different queries, and have the caller specify + which one he wants. + """ + + fieldNames = ["%hostname", "%bytes_sent", "%bytes_hit", "%healthy", "time", "event", "%site", "%elapsed", "%cpu"] + + if querySpec=="default": + groupByFields = ["%hostname", "event"] + elif (querySpec=="hpc"): + fieldNames.append("%cp") + groupByFields = ["%hostname", "event", "%cp"] + else: + raise ValueError("Unknown queryspec %s" % querySpec) + + fields = ["table1.%s AS %s" % (x,x) for x in fieldNames] + fields = ", ".join(fields) + + tableDesc = "%s.%s" % (self.projectName, self.tableName) + + groupByOn = ["table1.time = latest.maxtime"] + for field in groupByFields: + groupByOn.append("table1.%s = latest.%s" % (field, field)) + + groupByOn = " AND ".join(groupByOn) + groupByFields = ", ".join(groupByFields) + + base_query = "SELECT %s FROM [%s@-3600000--1] AS table1 JOIN (SELECT %s, max(time) as maxtime from [%s@-3600000--1] GROUP BY %s) AS latest ON %s" % \ + (fields, tableDesc, groupByFields, tableDesc, groupByFields, groupByOn) + + return base_query + + def get_cached_query_results(self, q, wait=True): + global glo_cached_queries + + if q in glo_cached_queries: + if (time.time() - glo_cached_queries[q]["time"]) <= 60: + print "using cached query" + return glo_cached_queries[q]["rows"] + + if not wait: + return None + + print "refreshing cached query" + result = self.run_query(q) + glo_cached_queries[q] = {"time": time.time(), "rows": result} + + return result + def process_request(self, req): print req.GET - tqx = req.GET.get("reqId", None) + tqx = req.GET.get("tqx", None) slice = req.GET.get("slice", None) site = req.GET.get("site", None) node = req.GET.get("node", None) + service = req.GET.get("service", None) + event = req.GET.get("event", "libvirt_heartbeat") + cp = req.GET.get("cp", None) format = req.GET.get("format", "json_dicts") - timeField = req.GET.get("timeField", "MinuteTime") + timeBucket = int(req.GET.get("timeBucket", 60)) avg = self.get_list_from_req(req, "avg") sum = self.get_list_from_req(req, "sum") count = self.get_list_from_req(req, "count") computed = self.get_list_from_req(req, "computed") - groupBy = self.get_list_from_req(req, "groupBy", ["MinuteTime"]) - orderBy = self.get_list_from_req(req, "orderBy", ["MinuteTime"]) + groupBy = self.get_list_from_req(req, "groupBy", ["Time"]) + orderBy = self.get_list_from_req(req, "orderBy", ["Time"]) maxRows = req.GET.get("maxRows", None) + mergeDataModelSites = req.GET.get("mergeDataModelSites", None) + + maxAge = int(req.GET.get("maxAge", 60*60)) - q = self.compose_query(slice, site, node, timeField, avg, sum, count, computed, groupBy, orderBy) + cached = req.GET.get("cached", None) + cachedGroupBy = self.get_list_from_req(req, "cachedGroupBy", ["doesnotexist"]) + + filter={} + if slice: + filter["slice"] = slice + if site: + filter["site"] = site + if node: + filter["hostname"] = node + if event: + filter["event"] = event + if cp: + filter["cp"] = cp + + q = self.compose_query(filter, timeBucket, avg, sum, count, computed, [], groupBy, orderBy, maxAge=maxAge) print q - if (format=="raw"): + dataSourceUrl = "http://" + req.META["SERVER_NAME"] + ":" + req.META["SERVER_PORT"] + req.META["PATH_INFO"] + "?" + req.META["QUERY_STRING"].replace("format=","origFormat=").replace("%","%25") + "&format=charts"; + + if (format=="dataSourceUrl"): + result = {"dataSourceUrl": dataSourceUrl} + return ("application/javascript", result) + + elif (format=="raw"): result = self.run_query_raw(q) - result["reqId"] = 0 # XXX FIXME - return ("application/javascript", json.dumps(result)) + result["dataSourceUrl"] = dataSourceUrl + + result = json.dumps(result); + + return ("application/javascript", result) + + elif (format=="nodata"): + result = {"dataSourceUrl": dataSourceUrl, "query": q} + result = json.dumps(result); + return {"application/javascript", result} + + elif (format=="charts"): + bq_result = self.run_query_raw(q) + + # cloudscrutiny code is probably better! + table = {} + table["cols"] = self.schema_to_cols(bq_result["schema"]) + rows = [] + if "rows" in bq_result: + for row in bq_result["rows"]: + rowcols = [] + for (colnum,col) in enumerate(row["f"]): + if (colnum==0): + dt = datetime.datetime.fromtimestamp(float(col["v"])) + rowcols.append({"v": 'new Date("%s")' % dt.isoformat()}) + else: + try: + rowcols.append({"v": float(col["v"])}) + except: + rowcols.append({"v": col["v"]}) + rows.append({"c": rowcols}) + table["rows"] = rows + + if tqx: + reqId = tqx.strip("reqId:") + else: + reqId = "0" + + result = {"status": "okColumnChart", "reqId": reqId, "table": table, "version": "0.6"} + + result = "google.visualization.Query.setResponse(" + json.dumps(result) + ");" + + def unquote_it(x): return x.group()[1:-1].replace('\\"', '"') + + p = re.compile(r'"new Date\(\\"[^"]*\\"\)"') + result=p.sub(unquote_it, result) + + return ("application/javascript", result) + else: - result = self.run_query(q) + if cached: + results = self.get_cached_query_results(self.compose_cached_query(cached)) + + result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=cachedGroupBy) + else: + result = self.run_query(q) if maxRows: result = result[-int(maxRows):] - return self.format_result(format, result, q) + if mergeDataModelSites: + self.merge_datamodel_sites(result) + return self.format_result(format, result, q, dataSourceUrl) def DoPlanetStackAnalytics(request): bq = PlanetStackAnalytics() @@ -177,9 +430,31 @@ def DoPlanetStackAnalytics(request): return result def main(): - bq = PlanetStackAnalytics() + bq = PlanetStackAnalytics(tableName="demoevents") + + q = bq.compose_cached_query() + results = bq.run_query(q) + + #results = bq.postprocess_results(results, + # filter={"slice": "HyperCache"}, + # groupBy=["site"], + # computed=["bytes_sent/elapsed"], + # sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"], + # maxDeltaTime=60) + + #results = bq.postprocess_results(results, filter={"slice": "HyperCache"}, maxi=["cpu"], count=["hostname"], computed=["bytes_sent/elapsed"], groupBy=["Time", "site"], maxDeltaTime=80) + + results = bq.postprocess_results(results,filter={"event": "libvirt_heartbeat"}, avg=["cpu"], count=["hostname"], groupBy=["doesnotexist"]) + + bq.dump_table(results) + + sys.exit(0) + + q=bq.compose_query(sum=["%bytes_sent"], avg=["%cpu"], latest=True, groupBy=["Time", "%site"]) + print q + bq.dump_table(bq.run_query(q)) - q=bq.compose_query(avg=["%cpu"], count=["%hostname"], slice="HyperCache") + q=bq.compose_query(avg=["%cpu","%bandwidth"], count=["%hostname"], slice="HyperCache") print q bq.dump_table(bq.run_query(q)) @@ -187,9 +462,8 @@ def main(): print print q bq.dump_table(bq.run_query(q)) - #print bq.run_query_raw(q) - q=bq.compose_query(timeField="HourTime", avg=["%cpu"], count=["%hostname"], computed=["%bytes_sent/%elapsed"], groupBy=["HourTime"], orderBy=["HourTime"]) + q=bq.compose_query(timeBucket=60*60, avg=["%cpu"], count=["%hostname"], computed=["%bytes_sent/%elapsed"]) print print q bq.dump_table(bq.run_query(q))