915f88c4672aaa33053741da8dcf9205206db93b
[nepi.git] / src / nepi / testbeds / planetlab / resourcealloc.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import itertools
5 import functools
6 import operator
7 import random
8 import collections
9 import heapq
10
11 from nepi.util.settools import setclusters
12 from nepi.util.settools import classify
13
14 class ResourceAllocationError(Exception):
15     pass
16
17 def multicardinal(multiset):
18     return sum(quant for c,quant in multiset.iteritems())
19
20 def avail(cls, partition):
21     contains = classify.classContains
22     return reduce(operator.or_, 
23         classify.classComponents(cls, partition),
24         set())
25
26 def _log(logstream, message, *args, **kwargs):
27     if logstream:
28         if args:
29             logstream.write(message % args)
30         elif kwargs:
31             logstream.write(message % kwargs)
32         else:
33             logstream.write(message)
34         logstream.write('\n')
35
36 def alloc(requests, logstream = None, nonseparable = False, saveinteresting = None, backtracklim = 100000000, verbose = True, sample = random.sample):
37     """
38     Takes an iterable over requests, which are iterables of candidate node ids,
39     and returns a specific node id for each request (if successful).
40     
41     If it cannot, it will raise an ResourceAllocationError.
42     """
43     
44     # First, materialize the request iterable
45     requests = map(set,requests)
46     
47     # Classify all candidates
48     universe = reduce(operator.or_, requests, set())
49     partition = setclusters.disjoint_partition(*requests)
50     
51     # Classify requests
52     c_reqlist = classify.classify(requests, partition)
53     c_req = dict(
54         (c,len(r))
55         for c,r in c_reqlist.iteritems()
56     )
57     
58     # Classify universe
59     c_uni = map(len, partition)
60     
61     # Perform invariant sanity checks
62     if multicardinal(c_req) > sum(c_uni):
63         raise ResourceAllocationError, "Insufficient resources to grant request"
64     
65     for c,nreq in c_req.iteritems():
66         if nreq > len(avail(c, partition)):
67             raise ResourceAllocationError, "Insufficient resources to grant request, empty categories %s" % (
68                 filter(lambda i : classify.classContains(c,i), xrange(len(c))),
69             )
70
71     # Test for separability
72     if nonseparable:
73         components = clusters = []
74     else:
75         components = [
76             classify.classMembers(c, partition)
77             for c in c_req
78         ]
79         clusters = setclusters.disjoint_sets(*components)
80     
81     if len(clusters) > 1:
82         if verbose:
83             _log(logstream, "\nDetected %d clusters", len(clusters))
84         
85         # Requests are separable
86         # Solve each part separately, then rejoin them
87         
88         # Build a class for each cluster
89         clustermaps = []
90         compmap = dict([(pid,idx) for idx,pid in enumerate(map(id,components))])
91         for cluster in clusters:
92             cluster_class = classify.getClass(
93                 reduce(operator.or_, cluster, set()),
94                 partition )
95             clustermaps.append(cluster_class)
96         
97         # Build a plan: assign a cluster to each request
98         plan = []
99         for cluster_class in clustermaps:
100             plan_reqs = []
101             for c, c_requests in c_reqlist.iteritems():
102                 if classify.isSubclass(cluster_class, c):
103                     plan_reqs.extend(c_requests)
104             plan.append(plan_reqs)
105         
106         # Execute the plan
107         partial_results = []
108         for i,plan_req in enumerate(plan):
109             if verbose:
110                 _log(logstream, "Solving cluster %d/%d", i+1, len(plan))
111             partial_results.append(alloc(plan_req, 
112                 logstream, 
113                 nonseparable = True,
114                 saveinteresting = saveinteresting,
115                 backtracklim = backtracklim,
116                 verbose = verbose))
117         
118         # Join results
119         if verbose:
120             _log(logstream, "Joining partial results")
121         reqmap = dict([(pid,idx) for idx,pid in enumerate(map(id,requests))])
122         joint = [None] * len(requests)
123         for partial_result, partial_requests in zip(partial_results, plan):
124                 for assignment, partial_request in zip(partial_result, partial_requests):
125                     joint[reqmap[id(partial_request)]] = assignment
126         
127         return joint
128     else:
129         # Non-separable request, solve
130         #_log(logstream, "Non-separable request")
131         
132         # Solve
133         partial = collections.defaultdict(list)
134         Pavail = list(c_uni)
135         Gavail = dict([
136             (c, len(avail(c, partition)))
137             for c in c_req
138         ])
139         req = dict(c_req)
140         
141         # build a cardinality map
142         cardinality = dict([
143             (c, [classify.classCardinality(c,partition), -nreq])
144             for c,nreq in req.iteritems()
145         ])
146         
147         classContains = classify.classContains
148         isSubclass = classify.isSubclass
149         
150         stats = [
151             0, # ops
152             0, # back tracks
153             0, # skipped branches
154         ]
155         
156         def recursive_alloc():
157             # Successful termination condition: all requests satisfied
158             if not req:
159                 return True
160             
161             # Try in cardinality order
162             if quickstage:
163                 order = heapq.nsmallest(2, req, key=Gavail.__getitem__)
164             else:
165                 order = sorted(req, key=Gavail.__getitem__)
166             
167             # Do backtracking on those whose cardinality leaves a choice
168             # Force a pick when it does not
169             if order and (Gavail[order[0]] <= 1
170                           or classify.classCardinality(order[0]) <= 1):
171                 order = order[:1]
172             
173             for c in order:
174                 nreq = req[c]
175                 #carditem = cardinality[c]
176                 for i,bit in enumerate(c):
177                     if bit == "1" and Pavail[i]:
178                         stats[0] += 1 # ops+1
179                         
180                         subreq = min(Pavail[i], nreq)
181                         
182                         # branch sanity check
183                         skip = False
184                         for c2,navail in Gavail.iteritems():
185                             if c2 != c and classContains(c2, i) and (navail - subreq) < req.get(c2,0):
186                                 # Fail branch, don't even try
187                                 skip = True
188                                 break
189                         if skip:
190                             stats[2] += 1 # skipped branches + 1
191                             continue
192                         
193                         # forward track
194                         partial[c].append((i,subreq))
195                         Pavail[i] -= subreq
196                         #carditem[1] -= subreq
197                         
198                         for c2 in Gavail:
199                             if classContains(c2, i):
200                                 Gavail[c2] -= subreq
201                         
202                         if subreq < nreq:
203                             req[c] -= subreq
204                         else:
205                             del req[c]
206                         
207                         # Try to solve recursively
208                         success = recursive_alloc()
209                         
210                         if success:
211                             return success
212                         
213                         # Back track
214                         del partial[c][-1]
215                         Pavail[i] += subreq
216                         #carditem[1] += subreq
217                         
218                         for c2 in Gavail:
219                             if classContains(c2, i):
220                                 Gavail[c2] += subreq
221                         
222                         if subreq < nreq:
223                             req[c] += subreq
224                         else:
225                             req[c] = subreq
226                         
227                         stats[1] += 1 # backtracks + 1
228                         
229                         if (logstream or (saveinteresting is not None)) and (stats[1] & 0xffff) == 0:
230                             _log(logstream, "%r\n%r\n... stats: ops=%d, backtracks=%d, skipped=%d", Gavail, req,
231                                 *stats)
232                             
233                             if stats[1] == 0x1400000:
234                                 # Interesting case, log it out
235                                 _log(logstream, "... interesting case: %r", requests)
236                                 
237                                 if saveinteresting is not None:
238                                     saveinteresting.append(requests)
239                 if stats[1] > backtracklim:
240                     break
241                             
242             
243             # We tried and tried... and failed
244             return False
245         
246         # First try quickly (assign most selective first exclusively)
247         quickstage = True
248         success = recursive_alloc()
249         if not success:
250             # If it fails, retry exhaustively (try all assignment orders)
251             quickstage = False
252             success = recursive_alloc()
253         
254         if verbose or (not success or stats[1] or stats[2]):
255             _log(logstream, "%s with stats: ops=%d, backtracks=%d, skipped=%d",
256                 ("Succeeded" if success else "Failed"),
257                 *stats)
258         
259         if not success:
260             raise ResourceAllocationError, "Insufficient resources to grant request"
261         
262         # Perform actual assignment
263         Pavail = map(set, partition)
264         solution = {}
265         for c, partial_assignments in partial.iteritems():
266             psol = set()
267             for i, nreq in partial_assignments:
268                 part = Pavail[i]
269                 if len(part) < nreq:
270                     raise AssertionError, "Cannot allocate resources for supposedly valid solution!"
271                 assigned = set(sample(part, nreq))
272                 psol |= assigned
273                 part -= assigned
274             solution[c] = psol
275         
276         # Format solution for the caller (return a node id for each request)
277         reqmap = {}
278         for c,reqs in c_reqlist.iteritems():
279             for req in reqs:
280                 reqmap[id(req)] = c
281         
282         req_solution = []
283         for req in requests:
284             c = reqmap[id(req)]
285             req_solution.append(solution[c].pop())
286         
287         return req_solution
288
289
290 if __name__ == '__main__':
291     def test():
292         import random
293         import sys
294         
295         toughcases = [
296           (False,
297             [[9, 11, 12, 14, 16, 17, 18, 20, 21], 
298              [2], 
299              [2], 
300              [4, 5, 6, 7, 8, 11, 12, 13, 18, 22], 
301              [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22], 
302              [6, 10, 11, 13, 14, 15, 16, 18, 20], 
303              [3, 7, 8, 9, 10, 12, 14, 17, 22], 
304              [0, 1, 3, 4, 5, 6, 7, 8, 10, 13, 14, 17, 19, 21, 22], 
305              [16, 22]]),
306           (False,
307             [[2, 10, 0, 3, 4, 8], 
308              [4, 1, 6, 10, 2, 0, 5, 9, 8, 7], 
309              [8, 3, 0, 2, 1, 4, 10, 7, 5], 
310              [8], 
311              [2], 
312              [2, 8], 
313              [2, 7, 8, 3, 1, 0, 9, 10, 5, 4, 6], 
314              [2, 4, 8, 10, 1, 3, 9], 
315              [3, 0, 5]]),
316           (True,
317             [[2, 10, 0, 3, 4, 8], 
318              [4, 1, 6, 10, 2, 0, 5, 9, 8, 7], 
319              [8, 3, 0, 2, 1, 4, 10, 7, 5], 
320              [8], 
321              [2, 8], 
322              [2, 7, 8, 3, 1, 0, 9, 10, 5, 4, 6], 
323              [2, 4, 8, 10, 1, 3, 9], 
324              [3, 0, 5]]),
325         ]
326         
327         # Test tough cases
328         for n,(solvable,req) in enumerate(toughcases):
329             print "Trying #R = %4d, #S = %4d (tough case %d)" % (len(req), len(reduce(operator.or_, map(set,req), set())), n)
330             try:
331                 solution = alloc(req, sys.stdout, verbose=False)
332                 if solvable:
333                     print "  OK - allocation successful"
334                 else:
335                     raise AssertionError, "Case %r had no solution, but got %r" % (req, solution)
336             except ResourceAllocationError: 
337                 if not solvable:
338                     print "  OK - allocation not possible"
339                 else:
340                     raise AssertionError, "Case %r had a solution, but got none" % (req,)
341         
342         interesting = []
343         
344         suc_mostlypossible = mostlypossible = 0
345         suc_mostlyimpossible = mostlyimpossible = 0
346         suc_huge = huge = 0
347         
348         try:
349             # Fuzzer - mostly impossible cases
350             for i in xrange(10000):
351                 nreq = random.randint(1,20)
352                 nsrv = random.randint(max(1,nreq-5),50)
353                 srv = range(nsrv)
354                 req = [
355                     random.sample(srv, random.randint(1,nsrv))
356                     for j in xrange(nreq)
357                 ]
358                 print "Trying %5d: #R = %4d, #S = %4d... " % (i, nreq, nsrv),
359                 sys.stdout.flush()
360                 mostlyimpossible += 1
361                 try:
362                     solution = alloc(req, sys.stdout, saveinteresting = interesting, verbose=False)
363                     suc_mostlyimpossible += 1
364                     print "  OK - allocation successful  \r",
365                 except ResourceAllocationError: 
366                     print "  OK - allocation not possible  \r",
367                 except KeyboardInterrupt:
368                     print "ABORTING CASE %r" % (req,)
369                     raise
370                 sys.stdout.flush()
371
372             # Fuzzer - mostly possible cases
373             for i in xrange(10000):
374                 nreq = random.randint(1,10)
375                 nsrv = random.randint(nreq,100)
376                 srv = range(nsrv)
377                 req = [
378                     random.sample(srv, random.randint(min(nreq,nsrv),nsrv))
379                     for j in xrange(nreq)
380                 ]
381                 print "Trying %5d: #R = %4d, #S = %4d... " % (i, nreq, nsrv),
382                 sys.stdout.flush()
383                 mostlypossible += 1
384                 try:
385                     solution = alloc(req, sys.stdout, saveinteresting = interesting, verbose=False)
386                     suc_mostlypossible += 1
387                     print "  OK - allocation successful  \r",
388                 except ResourceAllocationError: 
389                     print "  OK - allocation not possible  \r",
390                 except KeyboardInterrupt:
391                     print "ABORTING CASE %r" % (req,)
392                     raise
393                 sys.stdout.flush()
394
395             # Fuzzer - biiig cases
396             for i in xrange(10):
397                 nreq = random.randint(1,500)
398                 nsrv = random.randint(1,8000)
399                 srv = range(nsrv)
400                 req = [
401                     random.sample(srv, random.randint(min(nreq,nsrv),nsrv))
402                     for j in xrange(nreq)
403                 ]
404                 print "Trying %4d: #R = %4d, #S = %4d... " % (i, nreq, nsrv),
405                 sys.stdout.flush()
406                 huge += 1
407                 try:
408                     solution = alloc(req, sys.stdout, saveinteresting = interesting, verbose=False)
409                     suc_huge += 1
410                     print "  OK - allocation successful  \r",
411                 except ResourceAllocationError: 
412                     print "  OK - allocation not possible  \r",
413                 except KeyboardInterrupt:
414                     print "ABORTING CASE %r" % (req,)
415                     raise
416                 sys.stdout.flush()
417         except:
418             print "ABORTING TEST"
419         
420         print "\nSuccess rates:"
421         print "  Mostly possible: %d/%d (%.2f%%)" % (suc_mostlypossible, mostlypossible, 100.0 * suc_mostlypossible / max(1,mostlypossible))
422         print "  Mostly impossible: %d/%d (%.2f%%)" % (suc_mostlyimpossible, mostlyimpossible, 100.0 * suc_mostlyimpossible / max(1,mostlyimpossible))
423         print "  Huge: %d/%d (%.2f%%)" % (suc_huge, huge, 100.0 * suc_huge / max(1,huge))
424         
425         if interesting:
426             print "%d interesting requests:" % (len(interesting),)
427             for n,req in enumerate(interesting):
428                 print "Interesting request %d/%d: %r", (n,len(interesting),req,)
429     test()
430