From 43adf1ba6f876a6df30d770c5c86be74141292c9 Mon Sep 17 00:00:00 2001 From: Scott Baker Date: Wed, 19 Mar 2014 21:54:55 -0700 Subject: [PATCH] check in hpc_wizard and analytics python source --- planetstack/hpc_wizard/README | 10 + planetstack/hpc_wizard/bigquery_analytics.py | 127 +++++++ planetstack/hpc_wizard/hpc_wizard.py | 340 ++++++++++++++++++ .../hpc_wizard/planetstack_analytics.py | 202 +++++++++++ planetstack/hpc_wizard/query.py | 271 ++++++++++++++ 5 files changed, 950 insertions(+) create mode 100644 planetstack/hpc_wizard/README create mode 100644 planetstack/hpc_wizard/bigquery_analytics.py create mode 100644 planetstack/hpc_wizard/hpc_wizard.py create mode 100644 planetstack/hpc_wizard/planetstack_analytics.py create mode 100644 planetstack/hpc_wizard/query.py diff --git a/planetstack/hpc_wizard/README b/planetstack/hpc_wizard/README new file mode 100644 index 0000000..06b12b5 --- /dev/null +++ b/planetstack/hpc_wizard/README @@ -0,0 +1,10 @@ +Two files are purposely not included in the git repository: + bigquery_credentials.dat + client_secrets.json + +These files must be manually installed. + +Additionally, the following packages must be installed: + yum -y install python-httplib2 + easy_install python_gflags + easy_install google_api_python_client diff --git a/planetstack/hpc_wizard/bigquery_analytics.py b/planetstack/hpc_wizard/bigquery_analytics.py new file mode 100644 index 0000000..ca08025 --- /dev/null +++ b/planetstack/hpc_wizard/bigquery_analytics.py @@ -0,0 +1,127 @@ +import re +import base64 +import requests +import urllib +import json +import httplib2 +import threading +import os +import time +import traceback + +from apiclient.discovery import build +from apiclient.errors import HttpError +from oauth2client.client import AccessTokenRefreshError +from oauth2client.client import OAuth2WebServerFlow +from oauth2client.client import flow_from_clientsecrets +from oauth2client.file import Storage +from oauth2client.tools import run_flow,run + +""" +yum -y install python-httplib2 +easy_install python_gflags +easy_install google_api_python_client +""" + +PROJECT_NUMBER = '549187599759' + +FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json', + scope='https://www.googleapis.com/auth/bigquery') + +MINUTE_MS = 60*1000 +HOUR_MS = 60*60*1000 + +class BigQueryAnalytics: + def __init__(self, table = "demoevents"): + self.projectName = "vicci" + self.tableName = table + self.mapping = json.loads(self.fetch_mapping(table=self.tableName)) + self.reverse_mapping = {v:k for k, v in self.mapping.items()} + + def fetch_mapping(self, m=0, table="events"): + req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table) + resp = requests.get(req) + if (resp.status_code==200): + return resp.text + else: + raise Exception('Error accessing register allocations: %d'%resp.status_code) + + def run_query_raw(self, query): + p = re.compile('%[a-zA-z_]*') + query = p.sub(self.remap, query) + + storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat') + credentials = storage.get() + + if credentials is None or credentials.invalid: + credentials = run(FLOW, storage) + + http = httplib2.Http() + http = credentials.authorize(http) + + service = build('bigquery', 'v2', http=http) + + body = {"query": query} + response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute() + + return response + + def translate_schema(self, response): + for field in response["schema"]["fields"]: + field["name"] = self.reverse_mapping.get(field["name"], field["name"]) + + def run_query(self, query): + response = self.run_query_raw(query) + + fieldNames = [] + for field in response["schema"]["fields"]: + fieldNames.append(field["name"]) + + result = [] + if "rows" in response: + for row in response["rows"]: + this_result = {} + for (i,column) in enumerate(row["f"]): + this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"] + result.append(this_result) + + return result + + def remap(self, match): + token = match.group()[1:] + if token in self.mapping: + return self.mapping[token] + else: + raise Exception('unknown token %s' % token) + + def dump_table(self, rows, keys=None): + if not keys: + keys = rows[0].keys() + + lens = {} + for key in keys: + lens[key] = len(key) + + for row in rows: + for key in keys: + thislen = len(str(row.get(key,""))) + lens[key] = max(lens.get(key,0), thislen) + + for key in keys: + print "%*s" % (lens[key], key), + print + + for row in rows: + for key in keys: + print "%*s" % (lens[key], str(row.get(key,""))), + print + +def main(): + bq = BigQueryAnalytics() + + rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname") + + bq.dump_table(rows) + +if __name__ == "__main__": + main() diff --git a/planetstack/hpc_wizard/hpc_wizard.py b/planetstack/hpc_wizard/hpc_wizard.py new file mode 100644 index 0000000..d619f64 --- /dev/null +++ b/planetstack/hpc_wizard/hpc_wizard.py @@ -0,0 +1,340 @@ +import datetime +import os +import operator +import socket +import pytz +import json +import random +import sys +import time + +if os.path.exists("/home/smbaker/projects/vicci/plstackapi/planetstack"): + sys.path.append("/home/smbaker/projects/vicci/plstackapi/planetstack") +else: + sys.path.append("/opt/planetstack") + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings") +from django import db +from django.db import connection +from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service +from hpc.models import HpcService, ServiceProvider, ContentProvider, OriginServer, CDNPrefix, HpcService + +# amount of time in milliseconds which will be queried for HPC statistics. +QUERY_TIME=150000 + +# Constants used for computing 'hotness' +# BLUE_LOAD = MB/s which should be a "0" on the hotness scale +# RED_LOAD = MB/s which should be a "1" on the hotness scale +BLUE_LOAD=5000000 +RED_LOAD=15000000 + +MAX_LOAD=RED_LOAD + +def log(what, showdate=True): + try: + if showdate: + file("/tmp/scott-hpcwizard.log", "a").write(time.strftime("%Y-%m-%d %H:%M:%S ", time.gmtime())) + file("/tmp/scott-hpcwizard.log", "a").write("%s\n" % what) + except: + pass # uh oh + +def log_exc(what): + log(what) + log(traceback.format_exc(), showdate=False) + +def avg(x): + return float(sum(x))/len(x) + +def format_float(x): + try: + return "%10.5f" % x + except: + return str(x) + +class HpcWizard: + def __init__(self): + try: + self.hpcService = HpcService.objects.get() + except: + # OpenCloud.us currently has a Service object instantiated instead + # of a HpcService. Fallback for now. + self.hpcService = Service.objects.get(name="HPC Service") + + self.hpcQueryThread = None + + def get_hpc_slices(self): + try: + slices = self.hpcService.slices.all() + except: + # BUG in data model -- Slice.service has related name 'service' and + # it should be 'slices' + slices = self.hpcService.service.all() + return slices + + def get_hpc_slivers(self): + slivers = [] + for slice in self.get_hpc_slices(): + for sliver in slice.slivers.all(): + slivers.append(sliver) + return slivers + + def fill_site_nodes(self, site, hpc_slivers=None): + if hpc_slivers is None: + hpc_slivers = self.get_hpc_slivers() + + site.availNodes = [] + site.hpcNodes = [] + for node in site.nodes.all(): + has_hpc = False + + for sliver in node.slivers.all(): + if sliver in hpc_slivers: + has_hpc = True + + if has_hpc: + site.hpcNodes.append(node) + else: + site.availNodes.append(node) + + def merge_site_statistics_old(self, sites): + """ this does it based on the sumb of all bandwidth + + The issue here is that we the computed load reacts immediately to + the addition or deletion of nodes. i.e. 5 nodes at 80% + 1 node at + 0% = average load 66%. + """ + site_dict = {} + for site in self.hpcQueryThread.site_rows: + site_dict[site["site"]] = site + + for site in sites: + if site.name in site_dict: + site.bytes_sent = site_dict[site.name]["sum_bytes_sent"] + time_delta = site_dict[site.name]["time_delta"] + computed_duration = (int(time_delta/30)+1)*30 + if (computed_duration > 0): + site.bandwidth = site.bytes_sent/computed_duration + if len(site.hpcNodes)>0: + # figure out how many bytes_sent would be represented + # by blue and red + blue_load = len(site.hpcNodes) * BLUE_LOAD * computed_duration + red_load = len(site.hpcNodes) * RED_LOAD * computed_duration + max_load = len(site.hpcNodes) * MAX_LOAD * computed_duration + + site.hotness = (min(red_load, max(blue_load, float(site.bytes_sent))) - blue_load)/(red_load-blue_load) + site.load = int(min(100, site.bytes_sent*100/max_load)) + + file("/tmp/scott2.txt","a").write("%s %d %0.2f %0.2f %0.2f %0.2f %d\n" % (site.name, site.bytes_sent, blue_load, red_load, site.hotness, time_delta, computed_duration)) + + def merge_site_statistics(self, sites): + """ This does it based on max load + + Advantage of this method is that since we're effectively reporting + the maximally loaded node, we don't get instantaneous reactions + to adding additional nodes. On the contrary, it will take a while + for the load to balance from the loaded node to the new less-loaded + node. + """ + site_dict = {} + for site in self.hpcQueryThread.site_rows: + site_dict[site["site"]] = site + + for site in sites: + if site.name in site_dict: + site.max_avg_bandwidth = site_dict[site.name]["max_avg_bandwidth"] + site.bytes_sent = site_dict[site.name]["sum_bytes_sent"] + + site.hotness = min(1.0, float(max(BLUE_LOAD, site.max_avg_bandwidth) - BLUE_LOAD) / RED_LOAD) + site.load = int(site.max_avg_bandwidth*100/MAX_LOAD) + + # we still need site["bandwidth"] for the summary statistics + time_delta = site_dict[site.name]["time_delta"] + computed_duration = (int(time_delta/30)+1)*30 + if (computed_duration > 0): + site.bandwidth = site.bytes_sent/computed_duration + else: + site.bandwidth = 0 + + if len(site.hpcNodes)>0: + file("/tmp/scott3.txt","a").write("%s %d %0.2f %d %0.2f\n" % (site.name, site.bytes_sent, site.hotness, site.load, site.bandwidth)) + + def get_sites(self): + sites = list(Site.objects.all()) + + for site in sites: + self.fill_site_nodes(site, self.get_hpc_slivers()) + site.load = 0 + site.hotness = 0 + site.bandwidth = 0 + site.numNodes = len(site.hpcNodes) + len(site.availNodes) + + if (self.hpcQueryThread is not None) and (self.hpcQueryThread.is_stalled()): + self.initialize_statistics() + + # merge in the statistics data if it is available + if self.hpcQueryThread and self.hpcQueryThread.data_version>0: + self.merge_site_statistics(sites) + + # django will leak extraordinary amounts of memory without this line + db.reset_queries() + + return sites + + def get_nodes_to_sites(self): + nodes_to_sites = {} + + sites = list(Site.objects.all()) + + for site in sites: + for node in site.nodes.all(): + nodes_to_sites[node.name] = site.name + + return nodes_to_sites + + def get_slice_sites(self, slice_name): + sites = list(Site.objects.all()) + slivers = list(Slice.objects.get(name=slice_name).slivers.all()) + for site in sites: + self.fill_site_nodes(site, slivers) + return sites + + def get_sites_for_view(self): + sites = {} + for site in self.get_sites(): + if site.name in ["ON.Lab", "I2 Atlanta"]: + continue + + d = {"lat": float(site.location.latitude), + "long": float(site.location.longitude), + "health": 0, + "numNodes": site.numNodes, + "numHPCSlivers": len(site.hpcNodes), + "siteUrl": str(site.site_url), + "hot": getattr(site,"hotness",0.0), + "load": getattr(site,"load",0)} + sites[str(site.name)] = d + + import pprint + f = file("/tmp/scott.txt","w") + pprint.pprint(sites, f) + f.close() + + return sites + + def get_summary_for_view(self): + total_slivers = 0 + total_bandwidth = 0 + average_cpu = 0 + + sites = [site for site in self.get_sites() if len(site.hpcNodes)>0] + + total_slivers = sum( [len(site.hpcNodes) for site in sites] ) + total_bandwidth = sum( [site.bandwidth for site in sites] ) + average_cpu = int(avg( [site.load for site in sites] )) + + return {"total_slivers": total_slivers, + "total_bandwidth": total_bandwidth, + "average_cpu": average_cpu} + + def initialize_statistics(self): + from query import HpcQueryThread + + if (self.hpcQueryThread is not None): + log("dropping old query thread") + self.hpcQueryThread.please_die = True + self.hpcQueryThread = None + + log("launching new query thread") + + nodes_to_sites = self.get_nodes_to_sites() + self.hpcQueryThread = HpcQueryThread(nodes_to_sites = nodes_to_sites, timeStart=-QUERY_TIME, slice="HyperCache") + + def get_site(self, site_name): + site = Site.objects.get(name=site_name) + self.fill_site_nodes(site) + return site + + def increase_slivers(self, site_name, count): + site = self.get_site(site_name) + hpc_slice = self.get_hpc_slices()[0] + while (len(site.availNodes) > 0) and (count > 0): + node = site.availNodes.pop() + hostname = node.name + sliver = Sliver(name=node.name, + slice=hpc_slice, + node=node, + image = Image.objects.all()[0], + creator = User.objects.get(email="scott@onlab.us"), + deploymentNetwork=node.deployment, + numberCores = 1, + ip=socket.gethostbyname(hostname)) + sliver.save() + + print "created sliver", sliver + + site.hpcNodes.append(node) + + count = count - 1 + + def decrease_slivers(self, site_name, count): + site = self.get_site(site_name) + hpc_slices = self.get_hpc_slices() + while (len(site.hpcNodes) > 0) and (count > 0): + node = site.hpcNodes.pop() + for sliver in node.slivers.all(): + if sliver.slice in hpc_slices: + print "deleting sliver", sliver + sliver.delete() + + site.availNodes.append(node) + count = count - 1 + + def dump(self): + print "slices:" + for slice in self.get_hpc_slices(): + print " ", slice + + print "sites:" + print "%20s %10s %10s %10s %10s %10s %10s" % ("name", "avail", "hpc", "lat", "long", "sent", "hot") + for site in self.get_sites(): + print "%20s %10d %10d %10s %10s %10d %10.2f" % (site.name, + len(site.availNodes), + len(site.hpcNodes), + format_float(site.location.latitude), + format_float(site.location.longitude), + getattr(site,"bytes_sent",0), + getattr(site,"hotness",0.5)) + + #print "slivers:" + #for sliver in self.get_hpc_slivers(): + # print " ", sliver + +glo_hpc_wizard = None + +def get_hpc_wizard(): + global glo_hpc_wizard + + if (glo_hpc_wizard is None): + glo_hpc_wizard = HpcWizard() + glo_hpc_wizard.initialize_statistics() + + return glo_hpc_wizard + +def main(): + x = HpcWizard() + + # initialized the Statistics thread, and wait for some data to show up + x.initialize_statistics() + while x.hpcQueryThread.data_version==0: + time.sleep(1) + + x.dump() + + # quick test of the increase / decrease functions + + x.increase_slivers("Princeton", 1) + x.decrease_slivers("Princeton", 1) + +if __name__=="__main__": + main() + diff --git a/planetstack/hpc_wizard/planetstack_analytics.py b/planetstack/hpc_wizard/planetstack_analytics.py new file mode 100644 index 0000000..cc34933 --- /dev/null +++ b/planetstack/hpc_wizard/planetstack_analytics.py @@ -0,0 +1,202 @@ +from bigquery_analytics import BigQueryAnalytics +import json + +class PlanetStackAnalytics(BigQueryAnalytics): + def __init__(self, tableName="demoevents"): + BigQueryAnalytics.__init__(self, tableName) + + def compose_query(self, slice=None, site=None, node=None, timeField="MinuteTime", avg=[], sum=[], count=[], computed=[], groupBy=["MinuteTime"], orderBy=["MinuteTime"], tableName="demoevents"): + tablePart = "%s.%s@-3600000--1" % ("vicci", tableName) + + fields = [] + fieldNames = [] + + if (timeField=="MinuteTime"): + fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/60)*60 as MinuteTime") + elif (timeField=="HourTime"): + fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/60/60)*60*60 as HourTime") + elif (timeField=="DayTime"): + fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/60/60/24)*60*60*24 as DayTime") + + for fieldName in avg: + fields.append("AVG(%s) as avg_%s" % (fieldName, fieldName.replace("%",""))) + fieldNames.append("avg_%s" % fieldName.replace("%","")) + + for fieldName in sum: + fields.append("SUM(%s) as sum_%s" % (fieldName, fieldName.replace("%",""))) + fieldNames.append("sum_%s" % fieldName.replace("%","")) + + for fieldName in count: + fields.append("COUNT(distinct %s) as count_%s" % (fieldName, fieldName.replace("%",""))) + fieldNames.append("count_%s" % fieldName.replace("%","")) + + for fieldName in computed: + operator = "/" + parts = fieldName.split("/") + computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","") + if len(parts)==1: + operator = "*" + parts = computed.split("*") + computedFieldName = "computed_" + parts[0].replace("%","")+"_mult_"+parts[1].replace("%","") + fields.append("SUM(%s)%sSUM(%s) as %s" % (parts[0], operator, parts[1], computedFieldName)) + fieldNames.append(computedFieldName) + + fields = ", ".join(fields) + + where = [] + + if slice: + where.append("%%slice='%s'" % slice) + if site: + where.append("%%site='%s'" % site) + if node: + where.append("%%hostname='%s'" % node) + + if where: + where = " WHERE " + " AND ".join(where) + else: + where ="" + + if groupBy: + groupBy = " GROUP BY " + ",".join(groupBy) + else: + groupBy = "" + + if orderBy: + orderBy = " ORDER BY " + ",".join(orderBy) + else: + orderBy = "" + + if computed: + subQuery = "SELECT %%hostname, %s FROM [%s]" % (fields, tablePart) + if where: + subQuery = subQuery + where + subQuery = subQuery + " GROUP BY %s,%%hostname" % timeField + + sumFields = [] + for fieldName in fieldNames: + if fieldName.startswith("avg"): + sumFields.append("AVG(%s) as avg_%s"%(fieldName,fieldName)) + else: + sumFields.append("SUM(%s) as sum_%s"%(fieldName,fieldName)) + + sumFields = ",".join(sumFields) + + query = "SELECT %s, %s FROM (%s)" % (timeField, sumFields, subQuery) + if groupBy: + query = query + groupBy + if orderBy: + query = query + orderBy + else: + query = "SELECT %s FROM [%s]" % (fields, tablePart) + if where: + query = query + " " + where + if groupBy: + query = query + groupBy + if orderBy: + query = query + orderBy + + return query + + def get_list_from_req(self, req, name, default=[]): + value = req.GET.get(name, None) + if not value: + return default + return value.split(",") + + def format_result(self, format, result, query): + if (format == "json_dicts"): + result = {"query": query, "rows": result} + return ("application/javascript", json.dumps(result)) + + elif (format == "json_arrays"): + new_result = [] + for row in result: + new_row = [] + for key in sorted(row.keys()): + new_row.append(row[key]) + new_result.append(new_row) + new_result = {"query": query, "rows": new_result} + return ("application/javascript", json.dumps(new_result)) + + elif (format == "html_table"): + new_rows = [] + for row in result: + new_row = [] + for key in sorted(row.keys()): + new_row.append("%s" % str(row[key])) + new_rows.append("%s" % "".join(new_row)) + + new_result = "%s
" % "\n".join(new_rows) + + return ("text/html", new_result) + + def process_request(self, req): + print req.GET + + tqx = req.GET.get("reqId", None) + + slice = req.GET.get("slice", None) + site = req.GET.get("site", None) + node = req.GET.get("node", None) + + format = req.GET.get("format", "json_dicts") + + timeField = req.GET.get("timeField", "MinuteTime") + avg = self.get_list_from_req(req, "avg") + sum = self.get_list_from_req(req, "sum") + count = self.get_list_from_req(req, "count") + computed = self.get_list_from_req(req, "computed") + groupBy = self.get_list_from_req(req, "groupBy", ["MinuteTime"]) + orderBy = self.get_list_from_req(req, "orderBy", ["MinuteTime"]) + + maxRows = req.GET.get("maxRows", None) + + q = self.compose_query(slice, site, node, timeField, avg, sum, count, computed, groupBy, orderBy) + + print q + + if (format=="raw"): + result = self.run_query_raw(q) + result["reqId"] = 0 # XXX FIXME + return ("application/javascript", json.dumps(result)) + else: + result = self.run_query(q) + + if maxRows: + result = result[-int(maxRows):] + + return self.format_result(format, result, q) + + +def DoPlanetStackAnalytics(request): + bq = PlanetStackAnalytics() + result = bq.process_request(request) + + return result + +def main(): + bq = PlanetStackAnalytics() + + q=bq.compose_query(avg=["%cpu"], count=["%hostname"], slice="HyperCache") + print q + bq.dump_table(bq.run_query(q)) + + q=bq.compose_query(computed=["%bytes_sent/%elapsed"]) + print + print q + bq.dump_table(bq.run_query(q)) + #print bq.run_query_raw(q) + + q=bq.compose_query(timeField="HourTime", avg=["%cpu"], count=["%hostname"], computed=["%bytes_sent/%elapsed"], groupBy=["HourTime"], orderBy=["HourTime"]) + print + print q + bq.dump_table(bq.run_query(q)) + +if __name__ == "__main__": + main() + + + + + diff --git a/planetstack/hpc_wizard/query.py b/planetstack/hpc_wizard/query.py new file mode 100644 index 0000000..3570a56 --- /dev/null +++ b/planetstack/hpc_wizard/query.py @@ -0,0 +1,271 @@ +import re +import base64 +import requests +import urllib +import json +import httplib2 +import threading +import os +import time +import traceback + +from apiclient.discovery import build +from apiclient.errors import HttpError +from oauth2client.client import AccessTokenRefreshError +from oauth2client.client import OAuth2WebServerFlow +from oauth2client.client import flow_from_clientsecrets +from oauth2client.file import Storage +from oauth2client.tools import run_flow,run + +""" +yum -y install python-httplib2 +easy_install python_gflags +easy_install google_api_python_client +""" + + +PROJECT_NUMBER = '549187599759' + +FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json', + scope='https://www.googleapis.com/auth/bigquery') + +MINUTE_MS = 60*1000 +HOUR_MS = 60*60*1000 + +class HpcQuery: + def __init__(self): + self.mapping = json.loads(self.fetch_mapping(table="demoevents")) + self.reverse_mapping = {v:k for k, v in self.mapping.items()} + + def fetch_mapping(self, m=0, table="events"): + req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table) + resp = requests.get(req) + if (resp.status_code==200): + return resp.text + else: + raise Exception('Error accessing register allocations: %d'%resp.status_code) + + def run_query_old(self, query): + req = 'http://cloud-scrutiny.appspot.com/command?action=send_query&q=%s' % urllib.quote(query) + resp = requests.get(req) + if (resp.status_code==200): + return resp.text + else: + raise Exception('Error running query: %d'%resp.status_code) + return resp + + def run_query(self, query): + storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat') + credentials = storage.get() + + if credentials is None or credentials.invalid: + credentials = run(FLOW, storage) + + http = httplib2.Http() + http = credentials.authorize(http) + + service = build('bigquery', 'v2', http=http) + + body = {"query": query} + response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute() + + fieldNames = [] + for field in response["schema"]["fields"]: + fieldNames.append(field["name"]) + + result = [] + if "rows" in response: + for row in response["rows"]: + this_result = {} + for (i,column) in enumerate(row["f"]): + this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"] + result.append(this_result) + + return result + + def remap(self, match): + token = match.group()[1:] + if token in self.mapping: + return self.mapping[token] + else: + raise Exception('unknown token %s' % token) + + def get_usage(self, cp=None, hostname=None, site=None, slice=None, timeStart=-HOUR_MS, timeStop=-1, groupBy=["%hostname", "%cp"]): + where = [] + if slice is not None: + where.append("%slice='" + slice + "'") + if cp is not None: + where.append("%cp='" + cp + "'") + if hostname is not None: + where.append("%hostname='" + hostname + "'") + if site is not None: + where.append("%hostname contains " + site) + where.append("%bytes_sent>0") + where = "WHERE " + " AND ".join(where) + + if timeStart is not None: + tableName = "[vicci.demoevents@%d-%d]" % (timeStart,timeStop) + else: + tableName = "[vicci.demoevents]" + + query = "SELECT %hostname,%cp,sum(%bytes_sent) as sum_bytes_sent,sum(%bytes_hit) as sum_bytes_hit, AVG(%bandwidth) as avg_bandwidth," + \ + " MAX(TIMESTAMP_TO_MSEC(time))-MIN(TIMESTAMP_TO_MSEC(time)) as time_delta FROM " + \ + tableName + " " + where + + if groupBy: + query = query + " GROUP BY " + ",".join(groupBy) + + p = re.compile('%[a-zA-z_]*') + query = p.sub(self.remap, query) + + rows = self.run_query(query) + + for row in rows: + row["sum_bytes_sent"] = int(row.get("sum_bytes_sent",0)) + row["sum_bytes_hit"] = int(row.get("sum_bytes_hit",0)) + row["avg_bandwidth"] = int(float(row.get("avg_bandwidth",0))) + row["time_delta"] = float(row.get("time_delta",0.0))/1000.0 + + elapsed = (timeStop-timeStart)/1000 + KBps = int(row.get("sum_bytes_sent",0)) / elapsed / 1024 + row["KBps"] = KBps + + return rows + + def sites_from_usage(self, rows, nodes_to_sites={}): + sites = {} + for row in rows: + hostname = row["hostname"] + + if hostname in nodes_to_sites: + site_name = nodes_to_sites[hostname] + else: + parts = hostname.split(".") + if len(parts)<=2: + continue + site_name = parts[1] + + if not (site_name in sites): + row = row.copy() + row["site"] = site_name + row["max_avg_bandwidth"] = row["avg_bandwidth"] + # sites table doesn't care about hostnames or avg_bandwidth + del row["hostname"] + del row["avg_bandwidth"] + sites[site_name] = row + else: + site_row = sites[site_name] + site_row["sum_bytes_sent"] = site_row["sum_bytes_sent"] + row["sum_bytes_sent"] + site_row["sum_bytes_hit"] = site_row["sum_bytes_hit"] + row["sum_bytes_hit"] + site_row["max_avg_bandwidth"] = max(site_row["max_avg_bandwidth"], row["avg_bandwidth"]) + site_row["time_delta"] = max(site_row["time_delta"], row["time_delta"]) + + return sites.values() + + def get_usage_sites(self, cp=None, slice=None, timeStart=-HOUR_MS, timeStop=-1): + rows = self.get_usage(cp=cp, slice=slice, timeStart=timeStart, timeStop=timeStop) + + return self.sites_from_usage(rows) + + def dump_table(self, rows, keys=None): + if not keys: + keys = rows[0].keys() + + lens = {} + for key in keys: + lens[key] = len(key) + + for row in rows: + for key in keys: + thislen = len(str(row.get(key,""))) + lens[key] = max(lens.get(key,0), thislen) + + for key in keys: + print "%*s" % (lens[key], key), + print + + for row in rows: + for key in keys: + print "%*s" % (lens[key], str(row.get(key,""))), + print + +class HpcQueryThread(HpcQuery, threading.Thread): + def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None, nodes_to_sites={}): + threading.Thread.__init__(self) + HpcQuery.__init__(self) + self.daemon = True + self.interval = interval + self.timeStart = timeStart + self.nodes_to_sites = nodes_to_sites + self.slice = slice + self.cp = cp + self.data_version = 0 + self.please_die = False + self.update_time = time.time() + self.start() + + def is_stalled(self): + if time.time()-self.update_time > 300: + return True + else: + return False + + def run(self): + while not self.please_die: + try: + self.rows = self.get_usage(timeStart=self.timeStart, cp=self.cp, slice=self.slice) + self.site_rows = self.sites_from_usage(self.rows, self.nodes_to_sites) + self.update_time = time.time() + self.new_data() + self.data_version += 1 + except: + file("/tmp/hpcquery_fail.txt","a").write(traceback.format_exc() + "\n") + time.sleep(self.interval) + + def new_data(self): + pass + +class HpcDumpThread(HpcQueryThread): + def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None): + HpcQueryThread.__init__(self, interval, slice, timeStart, cp) + + def new_data(self): + os.system("clear") + + print "update %d, data for last %d minutes" % (self.data_version, -self.timeStart/1000/60) + print + + self.dump_table(self.rows, ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"]) + print + self.dump_table(self.site_rows, ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"]) + print + + +def main_old(): + hq = HpcQuery() +# print hq.mapping + + print "5 minute" + hq.dump_table(hq.get_usage(timeStart=-MINUTE_MS*5), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"]) + print + hq.dump_table(hq.get_usage_sites(timeStart=-MINUTE_MS*5), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"]) + print + + print "1 hour" + hq.dump_table(hq.get_usage(), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"]) + print + hq.dump_table(hq.get_usage_sites(), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"]) + print + + print "24 hours" + hq.dump_table(hq.get_usage(timeStart=-HOUR_MS*24), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"]) + hq.dump_table(hq.get_usage_sites(timeStart=-HOUR_MS*24), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"]) + print + +def main(): + hd = HpcDumpThread() + while True: + time.sleep(30) + +if __name__ == "__main__": + main() -- 2.47.0