-from bigquery_analytics import BigQueryAnalytics
+from bigquery_analytics import BigQueryAnalytics, BIGQUERY_AVAILABLE
import datetime
import re
import os
sys.path.append("/opt/planetstack")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
+from django.conf import settings
from django import db
from django.db import connection
from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service
glo_cached_queries = {}
class PlanetStackAnalytics(BigQueryAnalytics):
- def __init__(self, tableName="demoevents"):
+ def __init__(self, tableName=None):
+ if not tableName:
+ tableName = settings.BIGQUERY_TABLE
+
BigQueryAnalytics.__init__(self, tableName)
def service_to_sliceNames(self, serviceName):
return [slice.name for slice in slices]
- def compose_query(self, slice=None, site=None, node=None, service=None, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName="demoevents", latest=False):
- tablePart = "[%s.%s@-3600000--1]" % ("vicci", tableName)
+ def compose_query(self, filter={}, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName=None, latest=False, maxAge=60*60):
+ if tableName is None:
+ tableName = self.tableName
+
+ maxAge = maxAge * 1000
+ tablePart = "[%s.%s@-%d--1]" % ("vicci", tableName, maxAge)
fields = []
fieldNames = []
where = []
- if slice:
- where.append("%%slice='%s'" % slice)
- if site:
- where.append("%%site='%s'" % site)
- if node:
- where.append("%%hostname='%s'" % node)
- if service:
- sliceNames = self.service_to_sliceNames(service)
+ if filter.get("slice",None):
+ where.append("%%slice='%s'" % filter["slice"])
+ if filter.get("site",None):
+ where.append("%%site='%s'" % filter["site"])
+ if filter.get("node",None):
+ where.append("%%hostname='%s'" % filter["node"])
+ if filter.get("event",None):
+ where.append("event='%s'" % filter["event"])
+ if filter.get("service",None):
+ sliceNames = self.service_to_sliceNames(filter["service"])
if sliceNames:
where.append("(" + " OR ".join(["%%slice='%s'" % sliceName for sliceName in sliceNames]) +")")
return value.split(",")
def format_result(self, format, result, query, dataSourceUrl):
+ if not BIGQUERY_AVAILABLE:
+ msg = "BigQuery Statistics Unavaiable"
+ else:
+ msg = None
+
if (format == "json_dicts"):
- result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl}
+ result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl, "msg": msg}
return ("application/javascript", json.dumps(result))
elif (format == "json_arrays"):
for key in sorted(row.keys()):
new_row.append(row[key])
new_result.append(new_row)
- new_result = {"query": query, "rows": new_result}
+ new_result = {"query": query, "rows": new_result, "msg": msg}
return ("application/javascript", json.dumps(new_result))
elif (format == "html_table"):
return ("text/html", new_result)
- def merge_datamodel_sites(self, rows):
+ def merge_datamodel_sites(self, rows, slice=None):
""" For a query that included "site" in its groupby, merge in the
opencloud site information.
"""
+
+ if slice:
+ try:
+ slice = Slice.objects.get(name=slice)
+ except:
+ slice = None
+
for row in rows:
sitename = row["site"]
try:
# we didn't find it in the data model
continue
+ allocated_slivers = 0
+ if model_site and slice:
+ for sliver in slice.slivers.all():
+ if sliver.node.site == model_site:
+ allocated_slivers = allocated_slivers + 1
+
row["lat"] = float(model_site.location.latitude)
row["long"] = float(model_site.location.longitude)
row["url"] = model_site.site_url
row["numNodes"] = model_site.nodes.count()
+ row["allocated_slivers"] = allocated_slivers
max_cpu = row.get("max_avg_cpu", row.get("max_cpu",0))
cpu=float(max_cpu)/100.0
row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD))
- def compose_latest_query(self, fieldNames=None, groupByFields=["%hostname", "event"]):
+ def compose_cached_query(self, querySpec='default'):
""" Compose a query that returns the 'most recent' row for each (hostname, event)
pair.
+
+ Note that groupByFields cannot contain any values that are 'Null' or those
+ rows will be excluded. For example, if groupByFields includes cp, then
+ there will be no libvirt_event rows, since libvirt_event does not have
+ cp.
+
+ This means we can't really have 'one query to rule them'. Settle on
+ having a couple of different queries, and have the caller specify
+ which one he wants.
"""
- if not fieldNames:
- fieldNames = ["%hostname", "%bytes_sent", "time", "event", "%site", "%elapsed", "%slice", "%cpu"]
+ fieldNames = ["%hostname", "%bytes_sent", "%bytes_hit", "%healthy", "time", "event", "%site", "%elapsed", "%cpu"]
+
+ if querySpec=="default":
+ groupByFields = ["%hostname", "event"]
+ elif (querySpec=="hpc"):
+ fieldNames.append("%cp")
+ groupByFields = ["%hostname", "event", "%cp"]
+ else:
+ raise ValueError("Unknown queryspec %s" % querySpec)
fields = ["table1.%s AS %s" % (x,x) for x in fieldNames]
fields = ", ".join(fields)
return base_query
- def get_cached_query_results(self, q):
+ def get_cached_query_results(self, q, wait=True):
global glo_cached_queries
if q in glo_cached_queries:
print "using cached query"
return glo_cached_queries[q]["rows"]
+ if not wait:
+ return None
+
print "refreshing cached query"
result = self.run_query(q)
glo_cached_queries[q] = {"time": time.time(), "rows": result}
site = req.GET.get("site", None)
node = req.GET.get("node", None)
service = req.GET.get("service", None)
+ event = req.GET.get("event", "libvirt_heartbeat")
+ cp = req.GET.get("cp", None)
format = req.GET.get("format", "json_dicts")
- timeField = req.GET.get("timeBucket", "60")
+ timeBucket = int(req.GET.get("timeBucket", 60))
avg = self.get_list_from_req(req, "avg")
sum = self.get_list_from_req(req, "sum")
count = self.get_list_from_req(req, "count")
maxRows = req.GET.get("maxRows", None)
mergeDataModelSites = req.GET.get("mergeDataModelSites", None)
+ maxAge = int(req.GET.get("maxAge", 60*60))
+
cached = req.GET.get("cached", None)
+ cachedGroupBy = self.get_list_from_req(req, "cachedGroupBy", ["doesnotexist"])
+
+ filter={}
+ if slice:
+ filter["slice"] = slice
+ if site:
+ filter["site"] = site
+ if node:
+ filter["hostname"] = node
+ if event:
+ filter["event"] = event
+ if cp:
+ filter["cp"] = cp
- q = self.compose_query(slice, site, node, service, timeField, avg, sum, count, computed, [], groupBy, orderBy)
+ q = self.compose_query(filter, timeBucket, avg, sum, count, computed, [], groupBy, orderBy, maxAge=maxAge)
print q
table = {}
table["cols"] = self.schema_to_cols(bq_result["schema"])
rows = []
- for row in bq_result["rows"]:
- rowcols = []
- for (colnum,col) in enumerate(row["f"]):
- if (colnum==0):
- dt = datetime.datetime.fromtimestamp(float(col["v"]))
- rowcols.append({"v": 'new Date("%s")' % dt.isoformat()})
- else:
- try:
- rowcols.append({"v": float(col["v"])})
- except:
- rowcols.append({"v": col["v"]})
- rows.append({"c": rowcols})
+ if "rows" in bq_result:
+ for row in bq_result["rows"]:
+ rowcols = []
+ for (colnum,col) in enumerate(row["f"]):
+ if (colnum==0):
+ dt = datetime.datetime.fromtimestamp(float(col["v"]))
+ rowcols.append({"v": 'new Date("%s")' % dt.isoformat()})
+ else:
+ try:
+ rowcols.append({"v": float(col["v"])})
+ except:
+ rowcols.append({"v": col["v"]})
+ rows.append({"c": rowcols})
table["rows"] = rows
if tqx:
else:
if cached:
- results = self.get_cached_query_results(self.compose_latest_query())
-
- filter={}
- if slice:
- filter["slice"] = slice
- if site:
- filter["site"] = site
- if node:
- filter["hostname"] = node
+ results = self.get_cached_query_results(self.compose_cached_query(cached))
- result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=["doesnotexist"])
+ result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=cachedGroupBy)
else:
result = self.run_query(q)
return result
def main():
- bq = PlanetStackAnalytics()
+ bq = PlanetStackAnalytics(tableName="demoevents")
- q = bq.compose_latest_query()
+ q = bq.compose_cached_query()
results = bq.run_query(q)
- results = bq.postprocess_results(results,
- #filter={"site": "Princeton"},
- groupBy=["site"],
- computed=["bytes_sent/elapsed"],
- sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"],
- maxDeltaTime=60)
+ #results = bq.postprocess_results(results,
+ # filter={"slice": "HyperCache"},
+ # groupBy=["site"],
+ # computed=["bytes_sent/elapsed"],
+ # sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"],
+ # maxDeltaTime=60)
+
+ #results = bq.postprocess_results(results, filter={"slice": "HyperCache"}, maxi=["cpu"], count=["hostname"], computed=["bytes_sent/elapsed"], groupBy=["Time", "site"], maxDeltaTime=80)
+
+ results = bq.postprocess_results(results,filter={"event": "libvirt_heartbeat"}, avg=["cpu"], count=["hostname"], groupBy=["doesnotexist"])
bq.dump_table(results)
+ sys.exit(0)
+
q=bq.compose_query(sum=["%bytes_sent"], avg=["%cpu"], latest=True, groupBy=["Time", "%site"])
print q
bq.dump_table(bq.run_query(q))
- sys.exit(0)
-
q=bq.compose_query(avg=["%cpu","%bandwidth"], count=["%hostname"], slice="HyperCache")
print q
bq.dump_table(bq.run_query(q))