From: Scott Baker Date: Fri, 18 Apr 2014 17:45:26 +0000 (-0700) Subject: cached query support, latest arg to compose_query, compose_latest_query X-Git-Url: http://git.onelab.eu/?a=commitdiff_plain;h=95b28d6f52523daef64069e6580a6479ca82a700;p=plstackapi.git cached query support, latest arg to compose_query, compose_latest_query --- diff --git a/planetstack/hpc_wizard/planetstack_analytics.py b/planetstack/hpc_wizard/planetstack_analytics.py index e75e253..8fee0dc 100644 --- a/planetstack/hpc_wizard/planetstack_analytics.py +++ b/planetstack/hpc_wizard/planetstack_analytics.py @@ -3,6 +3,7 @@ import datetime import re import os import sys +import time import json import traceback import urllib2 @@ -20,6 +21,8 @@ from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, BLUE_LOAD=5000000 RED_LOAD=15000000 +glo_cached_queries = {} + class PlanetStackAnalytics(BigQueryAnalytics): def __init__(self, tableName="demoevents"): BigQueryAnalytics.__init__(self, tableName) @@ -35,11 +38,12 @@ class PlanetStackAnalytics(BigQueryAnalytics): return [slice.name for slice in slices] - def compose_query(self, slice=None, site=None, node=None, service=None, timeBucket="60", avg=[], sum=[], count=[], computed=[], groupBy=["Time"], orderBy=["Time"], tableName="demoevents"): - tablePart = "%s.%s@-3600000--1" % ("vicci", tableName) + def compose_query(self, slice=None, site=None, node=None, service=None, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName="demoevents", latest=False): + tablePart = "[%s.%s@-3600000--1]" % ("vicci", tableName) fields = [] fieldNames = [] + srcFieldNames = ["time"] fields.append("SEC_TO_TIMESTAMP(INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s) as Time" % (str(timeBucket),str(timeBucket))) #fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s as Time" % (str(timeBucket),str(timeBucket))) @@ -47,14 +51,22 @@ class PlanetStackAnalytics(BigQueryAnalytics): for fieldName in avg: fields.append("AVG(%s) as avg_%s" % (fieldName, fieldName.replace("%",""))) fieldNames.append("avg_%s" % fieldName.replace("%","")) + srcFieldNames.append(fieldName) for fieldName in sum: fields.append("SUM(%s) as sum_%s" % (fieldName, fieldName.replace("%",""))) fieldNames.append("sum_%s" % fieldName.replace("%","")) + srcFieldNames.append(fieldName) for fieldName in count: fields.append("COUNT(distinct %s) as count_%s" % (fieldName, fieldName.replace("%",""))) fieldNames.append("count_%s" % fieldName.replace("%","")) + srcFieldNames.append(fieldName) + + for fieldName in val: + fields.append(fieldName) + fieldNames.append(fieldName) + srcFieldNames.append(fieldName) for fieldName in computed: operator = "/" @@ -66,11 +78,14 @@ class PlanetStackAnalytics(BigQueryAnalytics): computedFieldName = "computed_" + parts[0].replace("%","")+"_mult_"+parts[1].replace("%","") fields.append("SUM(%s)%sSUM(%s) as %s" % (parts[0], operator, parts[1], computedFieldName)) fieldNames.append(computedFieldName) + srcFieldNames.append(parts[0]) + srcFieldNames.append(parts[1]) for fieldName in groupBy: if (fieldName not in ["Time"]): fields.append(fieldName) fieldNames.append(fieldName) + srcFieldNames.append(fieldName) fields = ", ".join(fields) @@ -104,12 +119,20 @@ class PlanetStackAnalytics(BigQueryAnalytics): else: orderBy = "" + if latest: + latestFields = ["table1.%s as %s" % (x,x) for x in srcFieldNames] + latestFields = ", ".join(latestFields) + tablePart = """(SELECT %s FROM %s AS table1 + JOIN + (SELECT %%hostname, event, max(time) as maxtime from %s GROUP BY %%hostname, event) AS latest + ON + table1.%%hostname = latest.%%hostname AND table1.event = latest.event AND table1.time = latest.maxtime)""" % (latestFields, tablePart, tablePart) + if computed: - subQuery = "SELECT %%hostname, %s FROM [%s]" % (fields, tablePart) + subQuery = "SELECT %%hostname, %s FROM %s" % (fields, tablePart) if where: subQuery = subQuery + where subQuery = subQuery + groupBySub - #subQuery = subQuery + " GROUP BY %s,%%hostname" % timeField sumFields = [] for fieldName in fieldNames: @@ -129,7 +152,7 @@ class PlanetStackAnalytics(BigQueryAnalytics): if orderBy: query = query + orderBy else: - query = "SELECT %s FROM [%s]" % (fields, tablePart) + query = "SELECT %s FROM %s" % (fields, tablePart) if where: query = query + " " + where if groupBy: @@ -173,28 +196,6 @@ class PlanetStackAnalytics(BigQueryAnalytics): return ("text/html", new_result) - elif (format == "json_hpcdash"): - new_rows = {} - for row in result: - new_row = {"lat": float(row.get("lat", 0)), - "long": float(row.get("long", 0)), - "health": 0, - "numNodes": int(row.get("numNodes",0)), - "numHPCSlivers": int(row.get("sum_count_hostname", 0)), - "siteUrl": row.get("url", ""), - "hot": float(row.get("hotness", 0.0)), - "load": int(float(row.get("max_avg_cpu", 0)))} - new_rows[row["site"]] = new_row - return ("application/javascript", json.dumps(new_rows)) - - def only_largest(self, rows, fieldName): - """ Given a fieldName, only return the set of rows that had the - maximum value of that fieldName. - """ - maxVal = max( [int(row[fieldName]) for row in rows] ) - new_rows = [row for row in rows if int(row[fieldName])==maxVal] - return new_rows - def merge_datamodel_sites(self, rows): """ For a query that included "site" in its groupby, merge in the opencloud site information. @@ -212,9 +213,48 @@ class PlanetStackAnalytics(BigQueryAnalytics): row["url"] = model_site.site_url row["numNodes"] = model_site.nodes.count() - if "max_avg_cpu" in row: - cpu=float(row["max_avg_cpu"])/100.0 - row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD)) + max_cpu = row.get("max_avg_cpu", row.get("max_cpu",0)) + cpu=float(max_cpu)/100.0 + row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD)) + + def compose_latest_query(self, fieldNames=None, groupByFields=["%hostname", "event"]): + """ Compose a query that returns the 'most recent' row for each (hostname, event) + pair. + """ + + if not fieldNames: + fieldNames = ["%hostname", "%bytes_sent", "time", "event", "%site", "%elapsed", "%slice", "%cpu"] + + fields = ["table1.%s AS %s" % (x,x) for x in fieldNames] + fields = ", ".join(fields) + + tableDesc = "%s.%s" % (self.projectName, self.tableName) + + groupByOn = ["table1.time = latest.maxtime"] + for field in groupByFields: + groupByOn.append("table1.%s = latest.%s" % (field, field)) + + groupByOn = " AND ".join(groupByOn) + groupByFields = ", ".join(groupByFields) + + base_query = "SELECT %s FROM [%s@-3600000--1] AS table1 JOIN (SELECT %s, max(time) as maxtime from [%s@-3600000--1] GROUP BY %s) AS latest ON %s" % \ + (fields, tableDesc, groupByFields, tableDesc, groupByFields, groupByOn) + + return base_query + + def get_cached_query_results(self, q): + global glo_cached_queries + + if q in glo_cached_queries: + if (time.time() - glo_cached_queries[q]["time"]) <= 60: + print "using cached query" + return glo_cached_queries[q]["rows"] + + print "refreshing cached query" + result = self.run_query(q) + glo_cached_queries[q] = {"time": time.time(), "rows": result} + + return result def process_request(self, req): print req.GET @@ -237,12 +277,14 @@ class PlanetStackAnalytics(BigQueryAnalytics): orderBy = self.get_list_from_req(req, "orderBy", ["Time"]) maxRows = req.GET.get("maxRows", None) - onlyLargest = req.GET.get("onlyLargest", None) mergeDataModelSites = req.GET.get("mergeDataModelSites", None) - q = self.compose_query(slice, site, node, service, timeField, avg, sum, count, computed, groupBy, orderBy) + cached = req.GET.get("cached", None) + + q = self.compose_query(slice, site, node, service, timeField, avg, sum, count, computed, [], groupBy, orderBy) print q + dataSourceUrl = "http://" + req.META["SERVER_NAME"] + ":" + req.META["SERVER_PORT"] + req.META["PATH_INFO"] + "?" + req.META["QUERY_STRING"].replace("format=","origFormat=").replace("%","%25") + "&format=charts"; if (format=="dataSourceUrl"): @@ -257,6 +299,11 @@ class PlanetStackAnalytics(BigQueryAnalytics): return ("application/javascript", result) + elif (format=="nodata"): + result = {"dataSourceUrl": dataSourceUrl, "query": q} + result = json.dumps(result); + return {"application/javascript", result} + elif (format=="charts"): bq_result = self.run_query_raw(q) @@ -295,20 +342,29 @@ class PlanetStackAnalytics(BigQueryAnalytics): return ("application/javascript", result) else: - result = self.run_query(q) + if cached: + results = self.get_cached_query_results(self.compose_latest_query()) + + filter={} + if slice: + filter["slice"] = slice + if site: + filter["site"] = site + if node: + filter["hostname"] = node + + result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=["doesnotexist"]) + else: + result = self.run_query(q) - if onlyLargest: - result = self.only_largest(result, onlyLargest) + if maxRows: + result = result[-int(maxRows):] if mergeDataModelSites: self.merge_datamodel_sites(result) - if maxRows: - result = result[-int(maxRows):] - return self.format_result(format, result, q, dataSourceUrl) - def DoPlanetStackAnalytics(request): bq = PlanetStackAnalytics() result = bq.process_request(request) @@ -318,6 +374,24 @@ def DoPlanetStackAnalytics(request): def main(): bq = PlanetStackAnalytics() + q = bq.compose_latest_query() + results = bq.run_query(q) + + results = bq.postprocess_results(results, + #filter={"site": "Princeton"}, + groupBy=["site"], + computed=["bytes_sent/elapsed"], + sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"], + maxDeltaTime=60) + + bq.dump_table(results) + + q=bq.compose_query(sum=["%bytes_sent"], avg=["%cpu"], latest=True, groupBy=["Time", "%site"]) + print q + bq.dump_table(bq.run_query(q)) + + sys.exit(0) + q=bq.compose_query(avg=["%cpu","%bandwidth"], count=["%hostname"], slice="HyperCache") print q bq.dump_table(bq.run_query(q)) @@ -332,15 +406,6 @@ def main(): print q bq.dump_table(bq.run_query(q)) - q=bq.compose_query(avg=["%cpu"], count=["%hostname"], computed=["%bytes_sent/%elapsed"], service="HPC Service", groupBy=["Time","%site"]) - print - print q - result=bq.run_query(q) - result = bq.only_largest(result, "Time") - bq.merge_datamodel_sites(result) - #bq.dump_table(result) - print bq.format_result("json_hpcdash", result, q) - if __name__ == "__main__": main()