X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=planetstack%2Fhpc_wizard%2Fplanetstack_analytics.py;h=75462d4eb029b27fae4ceecacf7cdce97e3db7c6;hb=7121285de503199c2d94c057b7c24fd62414b650;hp=2afacb3ac188a9799c12f5538cdbd0c76a614917;hpb=720a1599c39adf66abaa6de2b5f6a3ec2d26bd66;p=plstackapi.git diff --git a/planetstack/hpc_wizard/planetstack_analytics.py b/planetstack/hpc_wizard/planetstack_analytics.py index 2afacb3..75462d4 100644 --- a/planetstack/hpc_wizard/planetstack_analytics.py +++ b/planetstack/hpc_wizard/planetstack_analytics.py @@ -1,4 +1,4 @@ -from bigquery_analytics import BigQueryAnalytics +from bigquery_analytics import BigQueryAnalytics, BIGQUERY_AVAILABLE import datetime import re import os @@ -14,6 +14,7 @@ else: sys.path.append("/opt/planetstack") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings") +from django.conf import settings from django import db from django.db import connection from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service @@ -24,7 +25,10 @@ RED_LOAD=15000000 glo_cached_queries = {} class PlanetStackAnalytics(BigQueryAnalytics): - def __init__(self, tableName="demoevents"): + def __init__(self, tableName=None): + if not tableName: + tableName = settings.BIGQUERY_TABLE + BigQueryAnalytics.__init__(self, tableName) def service_to_sliceNames(self, serviceName): @@ -38,9 +42,12 @@ class PlanetStackAnalytics(BigQueryAnalytics): return [slice.name for slice in slices] - def compose_query(self, slice=None, site=None, node=None, service=None, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName="demoevents", latest=False, max_age=60*60): - max_age = max_age * 1000 - tablePart = "[%s.%s@-%d--1]" % ("vicci", tableName, max_age) + def compose_query(self, filter={}, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName=None, latest=False, maxAge=60*60): + if tableName is None: + tableName = self.tableName + + maxAge = maxAge * 1000 + tablePart = "[%s.%s@-%d--1]" % ("vicci", tableName, maxAge) fields = [] fieldNames = [] @@ -92,14 +99,16 @@ class PlanetStackAnalytics(BigQueryAnalytics): where = [] - if slice: - where.append("%%slice='%s'" % slice) - if site: - where.append("%%site='%s'" % site) - if node: - where.append("%%hostname='%s'" % node) - if service: - sliceNames = self.service_to_sliceNames(service) + if filter.get("slice",None): + where.append("%%slice='%s'" % filter["slice"]) + if filter.get("site",None): + where.append("%%site='%s'" % filter["site"]) + if filter.get("node",None): + where.append("%%hostname='%s'" % filter["node"]) + if filter.get("event",None): + where.append("event='%s'" % filter["event"]) + if filter.get("service",None): + sliceNames = self.service_to_sliceNames(filter["service"]) if sliceNames: where.append("(" + " OR ".join(["%%slice='%s'" % sliceName for sliceName in sliceNames]) +")") @@ -171,8 +180,13 @@ class PlanetStackAnalytics(BigQueryAnalytics): return value.split(",") def format_result(self, format, result, query, dataSourceUrl): + if not BIGQUERY_AVAILABLE: + msg = "BigQuery Statistics Unavaiable" + else: + msg = None + if (format == "json_dicts"): - result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl} + result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl, "msg": msg} return ("application/javascript", json.dumps(result)) elif (format == "json_arrays"): @@ -182,7 +196,7 @@ class PlanetStackAnalytics(BigQueryAnalytics): for key in sorted(row.keys()): new_row.append(row[key]) new_result.append(new_row) - new_result = {"query": query, "rows": new_result} + new_result = {"query": query, "rows": new_result, "msg": msg} return ("application/javascript", json.dumps(new_result)) elif (format == "html_table"): @@ -232,13 +246,29 @@ class PlanetStackAnalytics(BigQueryAnalytics): cpu=float(max_cpu)/100.0 row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD)) - def compose_latest_query(self, fieldNames=None, groupByFields=["%hostname", "event"]): + def compose_cached_query(self, querySpec='default'): """ Compose a query that returns the 'most recent' row for each (hostname, event) pair. + + Note that groupByFields cannot contain any values that are 'Null' or those + rows will be excluded. For example, if groupByFields includes cp, then + there will be no libvirt_event rows, since libvirt_event does not have + cp. + + This means we can't really have 'one query to rule them'. Settle on + having a couple of different queries, and have the caller specify + which one he wants. """ - if not fieldNames: - fieldNames = ["%hostname", "%bytes_sent", "time", "event", "%site", "%elapsed", "%slice", "%cpu"] + fieldNames = ["%hostname", "%bytes_sent", "%bytes_hit", "%healthy", "time", "event", "%site", "%elapsed", "%cpu"] + + if querySpec=="default": + groupByFields = ["%hostname", "event"] + elif (querySpec=="hpc"): + fieldNames.append("%cp") + groupByFields = ["%hostname", "event", "%cp"] + else: + raise ValueError("Unknown queryspec %s" % querySpec) fields = ["table1.%s AS %s" % (x,x) for x in fieldNames] fields = ", ".join(fields) @@ -283,10 +313,12 @@ class PlanetStackAnalytics(BigQueryAnalytics): site = req.GET.get("site", None) node = req.GET.get("node", None) service = req.GET.get("service", None) + event = req.GET.get("event", "libvirt_heartbeat") + cp = req.GET.get("cp", None) format = req.GET.get("format", "json_dicts") - timeField = req.GET.get("timeBucket", "60") + timeBucket = int(req.GET.get("timeBucket", 60)) avg = self.get_list_from_req(req, "avg") sum = self.get_list_from_req(req, "sum") count = self.get_list_from_req(req, "count") @@ -297,9 +329,24 @@ class PlanetStackAnalytics(BigQueryAnalytics): maxRows = req.GET.get("maxRows", None) mergeDataModelSites = req.GET.get("mergeDataModelSites", None) + maxAge = int(req.GET.get("maxAge", 60*60)) + cached = req.GET.get("cached", None) + cachedGroupBy = self.get_list_from_req(req, "cachedGroupBy", ["doesnotexist"]) - q = self.compose_query(slice, site, node, service, timeField, avg, sum, count, computed, [], groupBy, orderBy) + filter={} + if slice: + filter["slice"] = slice + if site: + filter["site"] = site + if node: + filter["hostname"] = node + if event: + filter["event"] = event + if cp: + filter["cp"] = cp + + q = self.compose_query(filter, timeBucket, avg, sum, count, computed, [], groupBy, orderBy, maxAge=maxAge) print q @@ -362,17 +409,9 @@ class PlanetStackAnalytics(BigQueryAnalytics): else: if cached: - results = self.get_cached_query_results(self.compose_latest_query()) - - filter={} - if slice: - filter["slice"] = slice - if site: - filter["site"] = site - if node: - filter["hostname"] = node + results = self.get_cached_query_results(self.compose_cached_query(cached)) - result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=["doesnotexist"]) + result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=cachedGroupBy) else: result = self.run_query(q) @@ -391,9 +430,9 @@ def DoPlanetStackAnalytics(request): return result def main(): - bq = PlanetStackAnalytics() + bq = PlanetStackAnalytics(tableName="demoevents") - q = bq.compose_latest_query(groupByFields=["%hostname", "event", "%slice"]) + q = bq.compose_cached_query() results = bq.run_query(q) #results = bq.postprocess_results(results, @@ -403,7 +442,9 @@ def main(): # sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"], # maxDeltaTime=60) - results = bq.postprocess_results(results, filter={"slice": "HyperCache"}, maxi=["cpu"], count=["hostname"], computed=["bytes_sent/elapsed"], groupBy=["Time", "site"], maxDeltaTime=80) + #results = bq.postprocess_results(results, filter={"slice": "HyperCache"}, maxi=["cpu"], count=["hostname"], computed=["bytes_sent/elapsed"], groupBy=["Time", "site"], maxDeltaTime=80) + + results = bq.postprocess_results(results,filter={"event": "libvirt_heartbeat"}, avg=["cpu"], count=["hostname"], groupBy=["doesnotexist"]) bq.dump_table(results)