user's can't set/unset is_admin, is_active and is_readonly values in Login Details...
[plstackapi.git] / planetstack / hpc_wizard / planetstack_analytics.py
index 8fee0dc..75462d4 100644 (file)
@@ -1,4 +1,4 @@
-from bigquery_analytics import BigQueryAnalytics
+from bigquery_analytics import BigQueryAnalytics, BIGQUERY_AVAILABLE
 import datetime
 import re
 import os
@@ -14,6 +14,7 @@ else:
     sys.path.append("/opt/planetstack")
 
 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
+from django.conf import settings
 from django import db
 from django.db import connection
 from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service
@@ -24,7 +25,10 @@ RED_LOAD=15000000
 glo_cached_queries = {}
 
 class PlanetStackAnalytics(BigQueryAnalytics):
-    def __init__(self, tableName="demoevents"):
+    def __init__(self, tableName=None):
+        if not tableName:
+            tableName = settings.BIGQUERY_TABLE
+
         BigQueryAnalytics.__init__(self, tableName)
 
     def service_to_sliceNames(self, serviceName):
@@ -38,8 +42,12 @@ class PlanetStackAnalytics(BigQueryAnalytics):
 
         return [slice.name for slice in slices]
 
-    def compose_query(self, slice=None, site=None, node=None, service=None, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName="demoevents", latest=False):
-        tablePart = "[%s.%s@-3600000--1]" % ("vicci", tableName)
+    def compose_query(self, filter={}, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName=None, latest=False, maxAge=60*60):
+        if tableName is None:
+            tableName = self.tableName
+
+        maxAge = maxAge * 1000
+        tablePart = "[%s.%s@-%d--1]" % ("vicci", tableName, maxAge)
 
         fields = []
         fieldNames = []
@@ -91,14 +99,16 @@ class PlanetStackAnalytics(BigQueryAnalytics):
 
         where = []
 
-        if slice:
-            where.append("%%slice='%s'" % slice)
-        if site:
-            where.append("%%site='%s'" % site)
-        if node:
-            where.append("%%hostname='%s'" % node)
-        if service:
-            sliceNames = self.service_to_sliceNames(service)
+        if filter.get("slice",None):
+            where.append("%%slice='%s'" % filter["slice"])
+        if filter.get("site",None):
+            where.append("%%site='%s'" % filter["site"])
+        if filter.get("node",None):
+            where.append("%%hostname='%s'" % filter["node"])
+        if filter.get("event",None):
+            where.append("event='%s'" % filter["event"])
+        if filter.get("service",None):
+            sliceNames = self.service_to_sliceNames(filter["service"])
             if sliceNames:
                 where.append("(" + " OR ".join(["%%slice='%s'" % sliceName for sliceName in sliceNames]) +")")
 
@@ -170,8 +180,13 @@ class PlanetStackAnalytics(BigQueryAnalytics):
         return value.split(",")
 
     def format_result(self, format, result, query, dataSourceUrl):
+        if not BIGQUERY_AVAILABLE:
+            msg = "BigQuery Statistics Unavaiable"
+        else:
+            msg = None
+
         if (format == "json_dicts"):
-            result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl}
+            result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl, "msg": msg}
             return ("application/javascript", json.dumps(result))
 
         elif (format == "json_arrays"):
@@ -181,7 +196,7 @@ class PlanetStackAnalytics(BigQueryAnalytics):
                 for key in sorted(row.keys()):
                     new_row.append(row[key])
                 new_result.append(new_row)
-                new_result = {"query": query, "rows": new_result}
+                new_result = {"query": query, "rows": new_result, "msg": msg}
             return ("application/javascript", json.dumps(new_result))
 
         elif (format == "html_table"):
@@ -196,10 +211,17 @@ class PlanetStackAnalytics(BigQueryAnalytics):
 
             return ("text/html", new_result)
 
-    def merge_datamodel_sites(self, rows):
+    def merge_datamodel_sites(self, rows, slice=None):
         """ For a query that included "site" in its groupby, merge in the
             opencloud site information.
         """
+
+        if slice:
+            try:
+                slice = Slice.objects.get(name=slice)
+            except:
+                slice = None
+
         for row in rows:
             sitename = row["site"]
             try:
@@ -208,22 +230,45 @@ class PlanetStackAnalytics(BigQueryAnalytics):
                 # we didn't find it in the data model
                 continue
 
+            allocated_slivers = 0
+            if model_site and slice:
+                for sliver in slice.slivers.all():
+                    if sliver.node.site == model_site:
+                        allocated_slivers = allocated_slivers + 1
+
             row["lat"] = float(model_site.location.latitude)
             row["long"] = float(model_site.location.longitude)
             row["url"] = model_site.site_url
             row["numNodes"] = model_site.nodes.count()
+            row["allocated_slivers"] = allocated_slivers
 
             max_cpu = row.get("max_avg_cpu", row.get("max_cpu",0))
             cpu=float(max_cpu)/100.0
             row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD))
 
-    def compose_latest_query(self, fieldNames=None, groupByFields=["%hostname", "event"]):
+    def compose_cached_query(self, querySpec='default'):
         """ Compose a query that returns the 'most recent' row for each (hostname, event)
             pair.
+
+            Note that groupByFields cannot contain any values that are 'Null' or those
+            rows will be excluded. For example, if groupByFields includes cp, then
+            there will be no libvirt_event rows, since libvirt_event does not have
+            cp.
+
+            This means we can't really have 'one query to rule them'. Settle on
+            having a couple of different queries, and have the caller specify
+            which one he wants.
         """
 
-        if not fieldNames:
-            fieldNames = ["%hostname", "%bytes_sent", "time", "event", "%site", "%elapsed", "%slice", "%cpu"]
+        fieldNames = ["%hostname", "%bytes_sent", "%bytes_hit", "%healthy", "time", "event", "%site", "%elapsed", "%cpu"]
+
+        if querySpec=="default":
+            groupByFields = ["%hostname", "event"]
+        elif (querySpec=="hpc"):
+            fieldNames.append("%cp")
+            groupByFields = ["%hostname", "event", "%cp"]
+        else:
+            raise ValueError("Unknown queryspec %s" % querySpec)
 
         fields = ["table1.%s AS %s" % (x,x) for x in fieldNames]
         fields = ", ".join(fields)
@@ -242,7 +287,7 @@ class PlanetStackAnalytics(BigQueryAnalytics):
 
         return base_query
 
-    def get_cached_query_results(self, q):
+    def get_cached_query_results(self, q, wait=True):
         global glo_cached_queries
 
         if q in glo_cached_queries:
@@ -250,6 +295,9 @@ class PlanetStackAnalytics(BigQueryAnalytics):
                 print "using cached query"
                 return glo_cached_queries[q]["rows"]
 
+        if not wait:
+            return None
+
         print "refreshing cached query"
         result = self.run_query(q)
         glo_cached_queries[q] = {"time": time.time(), "rows": result}
@@ -265,10 +313,12 @@ class PlanetStackAnalytics(BigQueryAnalytics):
         site = req.GET.get("site", None)
         node = req.GET.get("node", None)
         service = req.GET.get("service", None)
+        event = req.GET.get("event", "libvirt_heartbeat")
+        cp = req.GET.get("cp", None)
 
         format = req.GET.get("format", "json_dicts")
 
-        timeField = req.GET.get("timeBucket", "60")
+        timeBucket = int(req.GET.get("timeBucket", 60))
         avg = self.get_list_from_req(req, "avg")
         sum = self.get_list_from_req(req, "sum")
         count = self.get_list_from_req(req, "count")
@@ -279,9 +329,24 @@ class PlanetStackAnalytics(BigQueryAnalytics):
         maxRows = req.GET.get("maxRows", None)
         mergeDataModelSites = req.GET.get("mergeDataModelSites", None)
 
+        maxAge = int(req.GET.get("maxAge", 60*60))
+
         cached = req.GET.get("cached", None)
+        cachedGroupBy = self.get_list_from_req(req, "cachedGroupBy", ["doesnotexist"])
+
+        filter={}
+        if slice:
+            filter["slice"] = slice
+        if site:
+            filter["site"] = site
+        if node:
+            filter["hostname"] = node
+        if event:
+            filter["event"] = event
+        if cp:
+            filter["cp"] = cp
 
-        q = self.compose_query(slice, site, node, service, timeField, avg, sum, count, computed, [], groupBy, orderBy)
+        q = self.compose_query(filter, timeBucket, avg, sum, count, computed, [], groupBy, orderBy, maxAge=maxAge)
 
         print q
 
@@ -311,18 +376,19 @@ class PlanetStackAnalytics(BigQueryAnalytics):
             table = {}
             table["cols"] = self.schema_to_cols(bq_result["schema"])
             rows = []
-            for row in bq_result["rows"]:
-                rowcols = []
-                for (colnum,col) in enumerate(row["f"]):
-                    if (colnum==0):
-                        dt = datetime.datetime.fromtimestamp(float(col["v"]))
-                        rowcols.append({"v": 'new Date("%s")' % dt.isoformat()})
-                    else:
-                        try:
-                            rowcols.append({"v": float(col["v"])})
-                        except:
-                            rowcols.append({"v": col["v"]})
-                rows.append({"c": rowcols})
+            if "rows" in bq_result:
+                for row in bq_result["rows"]:
+                    rowcols = []
+                    for (colnum,col) in enumerate(row["f"]):
+                        if (colnum==0):
+                            dt = datetime.datetime.fromtimestamp(float(col["v"]))
+                            rowcols.append({"v": 'new Date("%s")' % dt.isoformat()})
+                        else:
+                            try:
+                                rowcols.append({"v": float(col["v"])})
+                            except:
+                                rowcols.append({"v": col["v"]})
+                    rows.append({"c": rowcols})
             table["rows"] = rows
 
             if tqx:
@@ -343,17 +409,9 @@ class PlanetStackAnalytics(BigQueryAnalytics):
 
         else:
             if cached:
-                results = self.get_cached_query_results(self.compose_latest_query())
-
-                filter={}
-                if slice:
-                    filter["slice"] = slice
-                if site:
-                    filter["site"] = site
-                if node:
-                    filter["hostname"] = node
+                results = self.get_cached_query_results(self.compose_cached_query(cached))
 
-                result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=["doesnotexist"])
+                result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=cachedGroupBy)
             else:
                 result = self.run_query(q)
 
@@ -372,26 +430,30 @@ def DoPlanetStackAnalytics(request):
     return result
 
 def main():
-    bq = PlanetStackAnalytics()
+    bq = PlanetStackAnalytics(tableName="demoevents")
 
-    q = bq.compose_latest_query()
+    q = bq.compose_cached_query()
     results = bq.run_query(q)
 
-    results = bq.postprocess_results(results,
-                                     #filter={"site": "Princeton"},
-                                     groupBy=["site"],
-                                     computed=["bytes_sent/elapsed"],
-                                     sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"],
-                                     maxDeltaTime=60)
+    #results = bq.postprocess_results(results,
+    #                                 filter={"slice": "HyperCache"},
+    #                                 groupBy=["site"],
+    #                                 computed=["bytes_sent/elapsed"],
+    #                                 sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"],
+    #                                 maxDeltaTime=60)
+
+    #results = bq.postprocess_results(results, filter={"slice": "HyperCache"}, maxi=["cpu"], count=["hostname"], computed=["bytes_sent/elapsed"], groupBy=["Time", "site"], maxDeltaTime=80)
+
+    results = bq.postprocess_results(results,filter={"event": "libvirt_heartbeat"}, avg=["cpu"], count=["hostname"], groupBy=["doesnotexist"])
 
     bq.dump_table(results)
 
+    sys.exit(0)
+
     q=bq.compose_query(sum=["%bytes_sent"], avg=["%cpu"], latest=True, groupBy=["Time", "%site"])
     print q
     bq.dump_table(bq.run_query(q))
 
-    sys.exit(0)
-
     q=bq.compose_query(avg=["%cpu","%bandwidth"], count=["%hostname"], slice="HyperCache")
     print q
     bq.dump_table(bq.run_query(q))