cache mappings, schema_to_cols function
[plstackapi.git] / planetstack / hpc_wizard / planetstack_analytics.py
1 from bigquery_analytics import BigQueryAnalytics
2 import os
3 import sys
4 import json
5 import traceback
6
7 if os.path.exists("/home/smbaker/projects/vicci/plstackapi/planetstack"):
8     sys.path.append("/home/smbaker/projects/vicci/plstackapi/planetstack")
9 else:
10     sys.path.append("/opt/planetstack")
11
12 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
13 from django import db
14 from django.db import connection
15 from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service
16
17 BLUE_LOAD=5000000
18 RED_LOAD=15000000
19
20 class PlanetStackAnalytics(BigQueryAnalytics):
21     def __init__(self, tableName="demoevents"):
22         BigQueryAnalytics.__init__(self, tableName)
23
24     def service_to_sliceNames(self, serviceName):
25         service=Service.objects.get(name=serviceName)
26         try:
27             slices = service.slices.all()
28         except:
29             # BUG in data model -- Slice.service has related name 'service' and
30             #                      it should be 'slices'
31             slices = service.service.all()
32
33         return [slice.name for slice in slices]
34
35     def compose_query(self, slice=None, site=None, node=None, service=None, timeField="MinuteTime", avg=[], sum=[], count=[], computed=[], groupBy=["MinuteTime"], orderBy=["MinuteTime"], tableName="demoevents"):
36         tablePart = "%s.%s@-3600000--1" % ("vicci", tableName)
37
38         fields = []
39         fieldNames = []
40
41         if (timeField=="MinuteTime"):
42             fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/60)*60 as MinuteTime")
43         elif (timeField=="HourTime"):
44             fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/60/60)*60*60 as HourTime")
45         elif (timeField=="DayTime"):
46             fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/60/60/24)*60*60*24 as DayTime")
47
48         for fieldName in avg:
49             fields.append("AVG(%s) as avg_%s" % (fieldName, fieldName.replace("%","")))
50             fieldNames.append("avg_%s" % fieldName.replace("%",""))
51
52         for fieldName in sum:
53             fields.append("SUM(%s) as sum_%s" % (fieldName, fieldName.replace("%","")))
54             fieldNames.append("sum_%s" % fieldName.replace("%",""))
55
56         for fieldName in count:
57             fields.append("COUNT(distinct %s) as count_%s" % (fieldName, fieldName.replace("%","")))
58             fieldNames.append("count_%s" % fieldName.replace("%",""))
59
60         for fieldName in computed:
61             operator = "/"
62             parts = fieldName.split("/")
63             computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
64             if len(parts)==1:
65                 operator = "*"
66                 parts = computed.split("*")
67                 computedFieldName = "computed_" + parts[0].replace("%","")+"_mult_"+parts[1].replace("%","")
68             fields.append("SUM(%s)%sSUM(%s) as %s" % (parts[0], operator, parts[1], computedFieldName))
69             fieldNames.append(computedFieldName)
70
71         for fieldName in groupBy:
72             if (fieldName not in ["MinuteTime", "HourTime", "DayTime"]):
73                 fields.append(fieldName)
74                 fieldNames.append(fieldName)
75
76         fields = ", ".join(fields)
77
78         where = []
79
80         if slice:
81             where.append("%%slice='%s'" % slice)
82         if site:
83             where.append("%%site='%s'" % site)
84         if node:
85             where.append("%%hostname='%s'" % node)
86         if service:
87             sliceNames = self.service_to_sliceNames(service)
88             if sliceNames:
89                 where.append("(" + " OR ".join(["%%slice='%s'" % sliceName for sliceName in sliceNames]) +")")
90
91         if where:
92             where = " WHERE " + " AND ".join(where)
93         else:
94             where =""
95
96         if groupBy:
97             groupBySub = " GROUP BY " + ",".join(groupBy + ["%hostname"])
98             groupBy = " GROUP BY " + ",".join(groupBy)
99         else:
100             groupBySub = " GROUP BY %hostname"
101             groupBy = ""
102
103         if orderBy:
104             orderBy = " ORDER BY " + ",".join(orderBy)
105         else:
106             orderBy = ""
107
108         if computed:
109             subQuery = "SELECT %%hostname, %s FROM [%s]" % (fields, tablePart)
110             if where:
111                 subQuery = subQuery + where
112             subQuery = subQuery + groupBySub
113             #subQuery = subQuery + " GROUP BY %s,%%hostname" % timeField
114
115             sumFields = []
116             for fieldName in fieldNames:
117                 if fieldName.startswith("avg"):
118                     sumFields.append("AVG(%s) as avg_%s"%(fieldName,fieldName))
119                     sumFields.append("MAX(%s) as max_%s"%(fieldName,fieldName))
120                 elif (fieldName.startswith("count")) or (fieldName.startswith("sum")) or (fieldName.startswith("computed")):
121                     sumFields.append("SUM(%s) as sum_%s"%(fieldName,fieldName))
122                 else:
123                     sumFields.append(fieldName)
124
125             sumFields = ",".join(sumFields)
126
127             query = "SELECT %s, %s FROM (%s)" % (timeField, sumFields, subQuery)
128             if groupBy:
129                 query = query + groupBy
130             if orderBy:
131                 query = query + orderBy
132         else:
133             query = "SELECT %s FROM [%s]" % (fields, tablePart)
134             if where:
135                 query = query + " " + where
136             if groupBy:
137                 query = query + groupBy
138             if orderBy:
139                 query = query + orderBy
140
141         return query
142
143     def get_list_from_req(self, req, name, default=[]):
144         value = req.GET.get(name, None)
145         if not value:
146             return default
147         return value.split(",")
148
149     def format_result(self, format, result, query):
150         if (format == "json_dicts"):
151             result = {"query": query, "rows": result}
152             return ("application/javascript", json.dumps(result))
153
154         elif (format == "json_arrays"):
155             new_result = []
156             for row in result:
157                 new_row = []
158                 for key in sorted(row.keys()):
159                     new_row.append(row[key])
160                 new_result.append(new_row)
161                 new_result = {"query": query, "rows": new_result}
162             return ("application/javascript", json.dumps(new_result))
163
164         elif (format == "html_table"):
165             new_rows = []
166             for row in result:
167                 new_row = []
168                 for key in sorted(row.keys()):
169                     new_row.append("<TD>%s</TD>" % str(row[key]))
170                 new_rows.append("<TR>%s</TR>" % "".join(new_row))
171
172             new_result = "<TABLE>%s</TABLE>" % "\n".join(new_rows)
173
174             return ("text/html", new_result)
175
176         elif (format == "json_hpcdash"):
177             new_rows = {}
178             for row in result:
179                 new_row = {"lat": float(row.get("lat", 0)),
180                            "long": float(row.get("long", 0)),
181                            "health": 0,
182                            "numNodes": int(row.get("numNodes",0)),
183                            "numHPCSlivers": int(row.get("sum_count_hostname", 0)),
184                            "siteUrl": row.get("url", ""),
185                            "hot": float(row.get("hotness", 0.0)),
186                            "load": int(float(row.get("max_avg_cpu", 0)))}
187                 new_rows[row["site"]] = new_row
188             return ("application/javascript", json.dumps(new_rows))
189
190     def only_largest(self, rows, fieldName):
191         """ Given a fieldName, only return the set of rows that had the
192             maximum value of that fieldName.
193         """
194         maxVal = max( [int(row[fieldName]) for row in rows] )
195         new_rows = [row for row in rows if int(row[fieldName])==maxVal]
196         return new_rows
197
198     def merge_datamodel_sites(self, rows):
199         """ For a query that included "site" in its groupby, merge in the
200             opencloud site information.
201         """
202         for row in rows:
203             sitename = row["site"]
204             try:
205                 model_site = Site.objects.get(name=sitename)
206             except:
207                 # we didn't find it in the data model
208                 continue
209
210             row["lat"] = float(model_site.location.latitude)
211             row["long"] = float(model_site.location.longitude)
212             row["url"] = model_site.site_url
213             row["numNodes"] = model_site.nodes.count()
214
215             if "max_avg_cpu" in row:
216                 cpu=float(row["max_avg_cpu"])/100.0
217                 row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD))
218
219     def process_request(self, req):
220         print req.GET
221
222         tqx = req.GET.get("reqId", None)
223
224         slice = req.GET.get("slice", None)
225         site = req.GET.get("site", None)
226         node = req.GET.get("node", None)
227         service = req.GET.get("service", None)
228
229         format = req.GET.get("format", "json_dicts")
230
231         timeField = req.GET.get("timeField", "MinuteTime")
232         avg = self.get_list_from_req(req, "avg")
233         sum = self.get_list_from_req(req, "sum")
234         count = self.get_list_from_req(req, "count")
235         computed = self.get_list_from_req(req, "computed")
236         groupBy = self.get_list_from_req(req, "groupBy", ["MinuteTime"])
237         orderBy = self.get_list_from_req(req, "orderBy", ["MinuteTime"])
238
239         maxRows = req.GET.get("maxRows", None)
240         onlyLargest = req.GET.get("onlyLargest", None)
241         mergeDataModelSites = req.GET.get("mergeDataModelSites", None)
242
243         q = self.compose_query(slice, site, node, service, timeField, avg, sum, count, computed, groupBy, orderBy)
244
245         print q
246
247         if (format=="raw"):
248             result = self.run_query_raw(q)
249             result["reqId"] = 0        # XXX FIXME
250             return ("application/javascript", json.dumps(result))
251         else:
252             result = self.run_query(q)
253
254             if onlyLargest:
255                 result = self.only_largest(result, onlyLargest)
256
257             if mergeDataModelSites:
258                 self.merge_datamodel_sites(result)
259
260             if maxRows:
261                 result = result[-int(maxRows):]
262
263             return self.format_result(format, result, q)
264
265
266 def DoPlanetStackAnalytics(request):
267     bq = PlanetStackAnalytics()
268     result = bq.process_request(request)
269
270     return result
271
272 def main():
273     bq = PlanetStackAnalytics()
274
275     """
276     q=bq.compose_query(avg=["%cpu"], count=["%hostname"], slice="HyperCache")
277     print q
278     bq.dump_table(bq.run_query(q))
279
280     q=bq.compose_query(computed=["%bytes_sent/%elapsed"])
281     print
282     print q
283     bq.dump_table(bq.run_query(q))
284
285     q=bq.compose_query(timeField="HourTime", avg=["%cpu"], count=["%hostname"], computed=["%bytes_sent/%elapsed"], groupBy=["HourTime"], orderBy=["HourTime"])
286     print
287     print q
288     bq.dump_table(bq.run_query(q))
289     """
290
291     q=bq.compose_query(avg=["%cpu"], count=["%hostname"], computed=["%bytes_sent/%elapsed"], service="HPC Service", groupBy=["MinuteTime","%site"])
292     print
293     print q
294     result=bq.run_query(q)
295     result = bq.only_largest(result, "MinuteTime")
296     bq.merge_datamodel_sites(result)
297     #bq.dump_table(result)
298     print bq.format_result("json_hpcdash", result, q)
299
300 if __name__ == "__main__":
301     main()
302
303
304
305
306