X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=planetstack%2Fhpc_wizard%2Fplanetstack_analytics.py;h=75462d4eb029b27fae4ceecacf7cdce97e3db7c6;hb=2123fde5ec74bdeab4fccf00b82b5cec37edf0b9;hp=8fee0dc014c30f42169e7fa101f03af5339bebae;hpb=70c18b708e1846ef996c70379b00fb8b008d1758;p=plstackapi.git diff --git a/planetstack/hpc_wizard/planetstack_analytics.py b/planetstack/hpc_wizard/planetstack_analytics.py index 8fee0dc..75462d4 100644 --- a/planetstack/hpc_wizard/planetstack_analytics.py +++ b/planetstack/hpc_wizard/planetstack_analytics.py @@ -1,4 +1,4 @@ -from bigquery_analytics import BigQueryAnalytics +from bigquery_analytics import BigQueryAnalytics, BIGQUERY_AVAILABLE import datetime import re import os @@ -14,6 +14,7 @@ else: sys.path.append("/opt/planetstack") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings") +from django.conf import settings from django import db from django.db import connection from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service @@ -24,7 +25,10 @@ RED_LOAD=15000000 glo_cached_queries = {} class PlanetStackAnalytics(BigQueryAnalytics): - def __init__(self, tableName="demoevents"): + def __init__(self, tableName=None): + if not tableName: + tableName = settings.BIGQUERY_TABLE + BigQueryAnalytics.__init__(self, tableName) def service_to_sliceNames(self, serviceName): @@ -38,8 +42,12 @@ class PlanetStackAnalytics(BigQueryAnalytics): return [slice.name for slice in slices] - def compose_query(self, slice=None, site=None, node=None, service=None, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName="demoevents", latest=False): - tablePart = "[%s.%s@-3600000--1]" % ("vicci", tableName) + def compose_query(self, filter={}, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName=None, latest=False, maxAge=60*60): + if tableName is None: + tableName = self.tableName + + maxAge = maxAge * 1000 + tablePart = "[%s.%s@-%d--1]" % ("vicci", tableName, maxAge) fields = [] fieldNames = [] @@ -91,14 +99,16 @@ class PlanetStackAnalytics(BigQueryAnalytics): where = [] - if slice: - where.append("%%slice='%s'" % slice) - if site: - where.append("%%site='%s'" % site) - if node: - where.append("%%hostname='%s'" % node) - if service: - sliceNames = self.service_to_sliceNames(service) + if filter.get("slice",None): + where.append("%%slice='%s'" % filter["slice"]) + if filter.get("site",None): + where.append("%%site='%s'" % filter["site"]) + if filter.get("node",None): + where.append("%%hostname='%s'" % filter["node"]) + if filter.get("event",None): + where.append("event='%s'" % filter["event"]) + if filter.get("service",None): + sliceNames = self.service_to_sliceNames(filter["service"]) if sliceNames: where.append("(" + " OR ".join(["%%slice='%s'" % sliceName for sliceName in sliceNames]) +")") @@ -170,8 +180,13 @@ class PlanetStackAnalytics(BigQueryAnalytics): return value.split(",") def format_result(self, format, result, query, dataSourceUrl): + if not BIGQUERY_AVAILABLE: + msg = "BigQuery Statistics Unavaiable" + else: + msg = None + if (format == "json_dicts"): - result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl} + result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl, "msg": msg} return ("application/javascript", json.dumps(result)) elif (format == "json_arrays"): @@ -181,7 +196,7 @@ class PlanetStackAnalytics(BigQueryAnalytics): for key in sorted(row.keys()): new_row.append(row[key]) new_result.append(new_row) - new_result = {"query": query, "rows": new_result} + new_result = {"query": query, "rows": new_result, "msg": msg} return ("application/javascript", json.dumps(new_result)) elif (format == "html_table"): @@ -196,10 +211,17 @@ class PlanetStackAnalytics(BigQueryAnalytics): return ("text/html", new_result) - def merge_datamodel_sites(self, rows): + def merge_datamodel_sites(self, rows, slice=None): """ For a query that included "site" in its groupby, merge in the opencloud site information. """ + + if slice: + try: + slice = Slice.objects.get(name=slice) + except: + slice = None + for row in rows: sitename = row["site"] try: @@ -208,22 +230,45 @@ class PlanetStackAnalytics(BigQueryAnalytics): # we didn't find it in the data model continue + allocated_slivers = 0 + if model_site and slice: + for sliver in slice.slivers.all(): + if sliver.node.site == model_site: + allocated_slivers = allocated_slivers + 1 + row["lat"] = float(model_site.location.latitude) row["long"] = float(model_site.location.longitude) row["url"] = model_site.site_url row["numNodes"] = model_site.nodes.count() + row["allocated_slivers"] = allocated_slivers max_cpu = row.get("max_avg_cpu", row.get("max_cpu",0)) cpu=float(max_cpu)/100.0 row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD)) - def compose_latest_query(self, fieldNames=None, groupByFields=["%hostname", "event"]): + def compose_cached_query(self, querySpec='default'): """ Compose a query that returns the 'most recent' row for each (hostname, event) pair. + + Note that groupByFields cannot contain any values that are 'Null' or those + rows will be excluded. For example, if groupByFields includes cp, then + there will be no libvirt_event rows, since libvirt_event does not have + cp. + + This means we can't really have 'one query to rule them'. Settle on + having a couple of different queries, and have the caller specify + which one he wants. """ - if not fieldNames: - fieldNames = ["%hostname", "%bytes_sent", "time", "event", "%site", "%elapsed", "%slice", "%cpu"] + fieldNames = ["%hostname", "%bytes_sent", "%bytes_hit", "%healthy", "time", "event", "%site", "%elapsed", "%cpu"] + + if querySpec=="default": + groupByFields = ["%hostname", "event"] + elif (querySpec=="hpc"): + fieldNames.append("%cp") + groupByFields = ["%hostname", "event", "%cp"] + else: + raise ValueError("Unknown queryspec %s" % querySpec) fields = ["table1.%s AS %s" % (x,x) for x in fieldNames] fields = ", ".join(fields) @@ -242,7 +287,7 @@ class PlanetStackAnalytics(BigQueryAnalytics): return base_query - def get_cached_query_results(self, q): + def get_cached_query_results(self, q, wait=True): global glo_cached_queries if q in glo_cached_queries: @@ -250,6 +295,9 @@ class PlanetStackAnalytics(BigQueryAnalytics): print "using cached query" return glo_cached_queries[q]["rows"] + if not wait: + return None + print "refreshing cached query" result = self.run_query(q) glo_cached_queries[q] = {"time": time.time(), "rows": result} @@ -265,10 +313,12 @@ class PlanetStackAnalytics(BigQueryAnalytics): site = req.GET.get("site", None) node = req.GET.get("node", None) service = req.GET.get("service", None) + event = req.GET.get("event", "libvirt_heartbeat") + cp = req.GET.get("cp", None) format = req.GET.get("format", "json_dicts") - timeField = req.GET.get("timeBucket", "60") + timeBucket = int(req.GET.get("timeBucket", 60)) avg = self.get_list_from_req(req, "avg") sum = self.get_list_from_req(req, "sum") count = self.get_list_from_req(req, "count") @@ -279,9 +329,24 @@ class PlanetStackAnalytics(BigQueryAnalytics): maxRows = req.GET.get("maxRows", None) mergeDataModelSites = req.GET.get("mergeDataModelSites", None) + maxAge = int(req.GET.get("maxAge", 60*60)) + cached = req.GET.get("cached", None) + cachedGroupBy = self.get_list_from_req(req, "cachedGroupBy", ["doesnotexist"]) + + filter={} + if slice: + filter["slice"] = slice + if site: + filter["site"] = site + if node: + filter["hostname"] = node + if event: + filter["event"] = event + if cp: + filter["cp"] = cp - q = self.compose_query(slice, site, node, service, timeField, avg, sum, count, computed, [], groupBy, orderBy) + q = self.compose_query(filter, timeBucket, avg, sum, count, computed, [], groupBy, orderBy, maxAge=maxAge) print q @@ -311,18 +376,19 @@ class PlanetStackAnalytics(BigQueryAnalytics): table = {} table["cols"] = self.schema_to_cols(bq_result["schema"]) rows = [] - for row in bq_result["rows"]: - rowcols = [] - for (colnum,col) in enumerate(row["f"]): - if (colnum==0): - dt = datetime.datetime.fromtimestamp(float(col["v"])) - rowcols.append({"v": 'new Date("%s")' % dt.isoformat()}) - else: - try: - rowcols.append({"v": float(col["v"])}) - except: - rowcols.append({"v": col["v"]}) - rows.append({"c": rowcols}) + if "rows" in bq_result: + for row in bq_result["rows"]: + rowcols = [] + for (colnum,col) in enumerate(row["f"]): + if (colnum==0): + dt = datetime.datetime.fromtimestamp(float(col["v"])) + rowcols.append({"v": 'new Date("%s")' % dt.isoformat()}) + else: + try: + rowcols.append({"v": float(col["v"])}) + except: + rowcols.append({"v": col["v"]}) + rows.append({"c": rowcols}) table["rows"] = rows if tqx: @@ -343,17 +409,9 @@ class PlanetStackAnalytics(BigQueryAnalytics): else: if cached: - results = self.get_cached_query_results(self.compose_latest_query()) - - filter={} - if slice: - filter["slice"] = slice - if site: - filter["site"] = site - if node: - filter["hostname"] = node + results = self.get_cached_query_results(self.compose_cached_query(cached)) - result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=["doesnotexist"]) + result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=cachedGroupBy) else: result = self.run_query(q) @@ -372,26 +430,30 @@ def DoPlanetStackAnalytics(request): return result def main(): - bq = PlanetStackAnalytics() + bq = PlanetStackAnalytics(tableName="demoevents") - q = bq.compose_latest_query() + q = bq.compose_cached_query() results = bq.run_query(q) - results = bq.postprocess_results(results, - #filter={"site": "Princeton"}, - groupBy=["site"], - computed=["bytes_sent/elapsed"], - sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"], - maxDeltaTime=60) + #results = bq.postprocess_results(results, + # filter={"slice": "HyperCache"}, + # groupBy=["site"], + # computed=["bytes_sent/elapsed"], + # sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"], + # maxDeltaTime=60) + + #results = bq.postprocess_results(results, filter={"slice": "HyperCache"}, maxi=["cpu"], count=["hostname"], computed=["bytes_sent/elapsed"], groupBy=["Time", "site"], maxDeltaTime=80) + + results = bq.postprocess_results(results,filter={"event": "libvirt_heartbeat"}, avg=["cpu"], count=["hostname"], groupBy=["doesnotexist"]) bq.dump_table(results) + sys.exit(0) + q=bq.compose_query(sum=["%bytes_sent"], avg=["%cpu"], latest=True, groupBy=["Time", "%site"]) print q bq.dump_table(bq.run_query(q)) - sys.exit(0) - q=bq.compose_query(avg=["%cpu","%bandwidth"], count=["%hostname"], slice="HyperCache") print q bq.dump_table(bq.run_query(q))