Merge branch 'master' of ssh://git.planet-lab.org/git/plstackapi
[plstackapi.git] / planetstack / hpc_wizard / hpc_wizard.py
1 import datetime
2 import os
3 import operator
4 import socket
5 import pytz
6 import json
7 import random
8 import sys
9 import time
10
11 if os.path.exists("/home/smbaker/projects/vicci/plstackapi/planetstack"):
12     sys.path.append("/home/smbaker/projects/vicci/plstackapi/planetstack")
13 else:
14     sys.path.append("/opt/planetstack")
15
16 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
17 from django import db
18 from django.db import connection
19 from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service
20 from hpc.models import HpcService, ServiceProvider, ContentProvider, OriginServer, CDNPrefix, HpcService
21
22 # amount of time in milliseconds which will be queried for HPC statistics.
23 QUERY_TIME=150000
24
25 # Constants used for computing 'hotness'
26 #    BLUE_LOAD = MB/s which should be a "0" on the hotness scale
27 #    RED_LOAD = MB/s which should be a "1" on the hotness scale
28 BLUE_LOAD=5000000
29 RED_LOAD=15000000
30
31 MAX_LOAD=RED_LOAD
32
33 def log(what, showdate=True):
34     try:
35         if showdate:
36             file("/tmp/scott-hpcwizard.log", "a").write(time.strftime("%Y-%m-%d %H:%M:%S ", time.gmtime()))
37         file("/tmp/scott-hpcwizard.log", "a").write("%s\n" % what)
38     except:
39         pass # uh oh
40
41 def log_exc(what):
42     log(what)
43     log(traceback.format_exc(), showdate=False)
44
45 def avg(x):
46     return float(sum(x))/len(x)
47
48 def format_float(x):
49     try:
50         return "%10.5f" % x
51     except:
52         return str(x)
53
54 class HpcWizard:
55     def __init__(self):
56         try:
57             self.hpcService = HpcService.objects.get()
58         except:
59             # OpenCloud.us currently has a Service object instantiated instead
60             # of a HpcService. Fallback for now.
61             self.hpcService = Service.objects.get(name="HPC Service")
62
63         self.hpcQueryThread = None
64
65     def get_hpc_slices(self):
66         try:
67             slices = self.hpcService.slices.all()
68         except:
69             # BUG in data model -- Slice.service has related name 'service' and
70             #                      it should be 'slices'
71             slices = self.hpcService.service.all()
72         return slices
73
74     def get_hpc_slivers(self):
75         slivers = []
76         for slice in self.get_hpc_slices():
77             for sliver in slice.slivers.all():
78                 slivers.append(sliver)
79         return slivers
80
81     def fill_site_nodes(self, site, hpc_slivers=None):
82         if hpc_slivers is None:
83             hpc_slivers = self.get_hpc_slivers()
84
85         site.availNodes = []
86         site.hpcNodes = []
87         for node in site.nodes.all():
88             has_hpc = False
89
90             for sliver in node.slivers.all():
91                 if sliver in hpc_slivers:
92                     has_hpc = True
93
94             if has_hpc:
95                 site.hpcNodes.append(node)
96             else:
97                 site.availNodes.append(node)
98
99     def merge_site_statistics(self, sites):
100         """ this does it based on the sumb of all bandwidth
101
102             The issue here is that we the computed load reacts immediately to
103             the addition or deletion of nodes. i.e. 5 nodes at 80% + 1 node at
104             0% = average load 66%.
105         """
106         site_dict = {}
107         for site in self.hpcQueryThread.site_rows:
108             site_dict[site["site"]] = site
109
110         for site in sites:
111             if site.name in site_dict:
112                 site.bytes_sent = site_dict[site.name]["sum_bytes_sent"]
113                 time_delta = site_dict[site.name]["time_delta"]
114                 computed_duration = (int(time_delta/30)+1)*30
115                 if (computed_duration > 0):
116                     site.bandwidth = site.bytes_sent/computed_duration
117                 if len(site.hpcNodes)>0:
118                     # figure out how many bytes_sent would be represented
119                     # by blue and red
120                     blue_load = len(site.hpcNodes) * BLUE_LOAD * computed_duration
121                     red_load = len(site.hpcNodes) * RED_LOAD * computed_duration
122                     max_load = len(site.hpcNodes) * MAX_LOAD * computed_duration
123
124                     site.hotness = (min(red_load, max(blue_load, float(site.bytes_sent))) - blue_load)/(red_load-blue_load)
125                     site.load = int(min(100, site.bytes_sent*100/max_load))
126
127                     file("/tmp/scott2.txt","a").write("%s %d %0.2f %0.2f %0.2f %0.2f %d\n" % (site.name, site.bytes_sent, blue_load, red_load, site.hotness, time_delta, computed_duration))
128
129     def merge_site_statistics_new(self, sites):
130         """ This does it based on max load
131
132             Advantage of this method is that since we're effectively reporting
133             the maximally loaded node, we don't get instantaneous reactions
134             to adding additional nodes. On the contrary, it will take a while
135             for the load to balance from the loaded node to the new less-loaded
136             node.
137         """
138         site_dict = {}
139         for site in self.hpcQueryThread.site_rows:
140             site_dict[site["site"]] = site
141
142         for site in sites:
143             if site.name in site_dict:
144                 site.max_avg_bandwidth = site_dict[site.name]["max_avg_bandwidth"]
145                 site.bytes_sent = site_dict[site.name]["sum_bytes_sent"]
146
147                 site.hotness = min(1.0, float(max(BLUE_LOAD, site.max_avg_bandwidth) - BLUE_LOAD) / (RED_LOAD-BLUE_LOAD))
148                 site.load = int(site.max_avg_bandwidth*100/MAX_LOAD)
149
150                 # we still need site["bandwidth"] for the summary statistics
151                 time_delta = site_dict[site.name]["time_delta"]
152                 computed_duration = (int(time_delta/30)+1)*30
153                 if (computed_duration > 0):
154                     site.bandwidth = site.bytes_sent/computed_duration
155                 else:
156                     site.bandwidth = 0
157
158                 if len(site.hpcNodes)>0:
159                     file("/tmp/scott3.txt","a").write("%s %d %0.2f %d %0.2f\n" % (site.name, site.bytes_sent, site.hotness, site.load, site.bandwidth))
160
161     def get_sites(self):
162         sites = list(Site.objects.all())
163
164         for site in sites:
165             self.fill_site_nodes(site, self.get_hpc_slivers())
166             site.load = 0
167             site.hotness = 0
168             site.bandwidth = 0
169             site.numNodes = len(site.hpcNodes) + len(site.availNodes)
170
171         if (self.hpcQueryThread is not None) and (self.hpcQueryThread.is_stalled()):
172             self.initialize_statistics()
173
174         # merge in the statistics data if it is available
175         if self.hpcQueryThread and self.hpcQueryThread.data_version>0:
176             self.merge_site_statistics(sites)
177
178         # django will leak extraordinary amounts of memory without this line
179         db.reset_queries()
180
181         return sites
182
183     def get_nodes_to_sites(self):
184         nodes_to_sites = {}
185
186         sites = list(Site.objects.all())
187
188         for site in sites:
189             for node in site.nodes.all():
190                 nodes_to_sites[node.name] = site.name
191
192         return nodes_to_sites
193
194     def get_slice_sites(self, slice_name):
195         sites = list(Site.objects.all())
196         slivers = list(Slice.objects.get(name=slice_name).slivers.all())
197         for site in sites:
198             self.fill_site_nodes(site, slivers)
199         return sites
200
201     def get_sites_for_view(self):
202         sites = {}
203         for site in self.get_sites():
204             if site.name in ["ON.Lab", "I2 Atlanta"]:
205                 continue
206
207             d = {"lat": float(site.location.latitude),
208                  "long": float(site.location.longitude),
209                  "health": 0,
210                  "numNodes": site.numNodes,
211                  "numHPCSlivers": len(site.hpcNodes),
212                  "siteUrl": str(site.site_url),
213                  "hot": getattr(site,"hotness",0.0),
214                  "load": getattr(site,"load",0)}
215             sites[str(site.name)] = d
216
217         import pprint
218         f = file("/tmp/scott.txt","w")
219         pprint.pprint(sites, f)
220         f.close()
221
222         return sites
223
224     def get_summary_for_view(self):
225         total_slivers = 0
226         total_bandwidth = 0
227         average_cpu = 0
228
229         sites = [site for site in self.get_sites() if len(site.hpcNodes)>0]
230
231         total_slivers = sum( [len(site.hpcNodes) for site in sites] )
232         total_bandwidth = sum( [site.bandwidth for site in sites] )
233         average_cpu = int(avg( [site.load for site in sites] ))
234
235         return {"total_slivers": total_slivers,
236                 "total_bandwidth": total_bandwidth,
237                 "average_cpu": average_cpu}
238
239     def initialize_statistics(self):
240         from query import HpcQueryThread
241
242         if (self.hpcQueryThread is not None):
243             log("dropping old query thread")
244             self.hpcQueryThread.please_die = True
245             self.hpcQueryThread = None
246
247         log("launching new query thread")
248
249         nodes_to_sites = self.get_nodes_to_sites()
250         self.hpcQueryThread = HpcQueryThread(nodes_to_sites = nodes_to_sites, timeStart=-QUERY_TIME, slice="HyperCache")
251
252     def get_site(self, site_name):
253         site = Site.objects.get(name=site_name)
254         self.fill_site_nodes(site)
255         return site
256
257     def increase_slivers(self, site_name, count):
258         site = self.get_site(site_name)
259         hpc_slice = self.get_hpc_slices()[0]
260         while (len(site.availNodes) > 0) and (count > 0):
261             node = site.availNodes.pop()
262             hostname = node.name
263             sliver = Sliver(name=node.name,
264                             slice=hpc_slice,
265                             node=node,
266                             image = Image.objects.all()[0],
267                             creator = User.objects.get(email="scott@onlab.us"),
268                             deploymentNetwork=node.deployment,
269                             numberCores = 1,
270                             ip=socket.gethostbyname(hostname))
271             sliver.save()
272
273             print "created sliver", sliver
274
275             site.hpcNodes.append(node)
276
277             count = count - 1
278
279     def decrease_slivers(self, site_name, count):
280         site = self.get_site(site_name)
281         hpc_slices = self.get_hpc_slices()
282         while (len(site.hpcNodes) > 0) and (count > 0):
283             node = site.hpcNodes.pop()
284             for sliver in node.slivers.all():
285                 if sliver.slice in hpc_slices:
286                      print "deleting sliver", sliver
287                      sliver.delete()
288
289             site.availNodes.append(node)
290             count = count - 1
291
292     def dump(self):
293         print "slices:"
294         for slice in self.get_hpc_slices():
295             print "  ", slice
296
297         print "sites:"
298         print "%20s %10s %10s %10s %10s %10s %10s" % ("name", "avail", "hpc", "lat", "long", "sent", "hot")
299         for site in self.get_sites():
300             print "%20s %10d %10d %10s %10s %10d %10.2f" % (site.name,
301                                                             len(site.availNodes),
302                                                             len(site.hpcNodes),
303                                                             format_float(site.location.latitude),
304                                                             format_float(site.location.longitude),
305                                                             getattr(site,"bytes_sent",0),
306                                                             getattr(site,"hotness",0.5))
307
308         #print "slivers:"
309         #for sliver in self.get_hpc_slivers():
310         #    print "  ", sliver
311
312 glo_hpc_wizard = None
313
314 def get_hpc_wizard():
315     global glo_hpc_wizard
316
317     if (glo_hpc_wizard is None):
318         glo_hpc_wizard = HpcWizard()
319 #        glo_hpc_wizard.initialize_statistics()
320
321     return glo_hpc_wizard
322
323 def main():
324     x = HpcWizard()
325
326     # initialized the Statistics thread, and wait for some data to show up
327     x.initialize_statistics()
328     while x.hpcQueryThread.data_version==0:
329        time.sleep(1)
330
331     x.dump()
332
333     # quick test of the increase / decrease functions
334
335     x.increase_slivers("Princeton", 1)
336     x.decrease_slivers("Princeton", 1)
337
338 if __name__=="__main__":
339     main()
340