a5385c3dbd9a0fab2baf06d1ca97cadeef3e48f5
[nepi.git] / src / nepi / testbeds / planetlab / resourcealloc.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import itertools
5 import functools
6 import operator
7 import random
8 import collections
9 import heapq
10
11 from nepi.util.settools import setclusters
12 from nepi.util.settools import classify
13
14 class ResourceAllocationError(Exception):
15     pass
16
17 def multicardinal(multiset):
18     return sum(quant for c,quant in multiset.iteritems())
19
20 def avail(cls, partition):
21     contains = classify.classContains
22     return reduce(operator.or_, 
23         classify.classComponents(cls, partition))
24
25 def _log(logstream, message, *args, **kwargs):
26     if logstream:
27         if args:
28             logstream.write(message % args)
29         elif kwargs:
30             logstream.write(message % kwargs)
31         else:
32             logstream.write(message)
33         logstream.write('\n')
34
35 def alloc(requests, logstream = None, nonseparable = False, saveinteresting = None, backtracklim = 100000000, verbose = True, sample = random.sample):
36     """
37     Takes an iterable over requests, which are iterables of candidate node ids,
38     and returns a specific node id for each request (if successful).
39     
40     If it cannot, it will raise an ResourceAllocationError.
41     """
42     
43     # First, materialize the request iterable
44     requests = map(set,requests)
45     
46     # Classify all candidates
47     universe = reduce(operator.or_, requests)
48     partition = setclusters.disjoint_partition(*requests)
49     
50     # Classify requests
51     c_reqlist = classify.classify(requests, partition)
52     c_req = dict(
53         (c,len(r))
54         for c,r in c_reqlist.iteritems()
55     )
56     
57     # Classify universe
58     c_uni = map(len, partition)
59     
60     # Perform invariant sanity checks
61     if multicardinal(c_req) > sum(c_uni):
62         raise ResourceAllocationError, "Insufficient resources to grant request"
63     
64     for c,nreq in c_req.iteritems():
65         if nreq > len(avail(c, partition)):
66             raise ResourceAllocationError, "Insufficient resources to grant request, empty categories %s" % (
67                 filter(lambda i : classify.classContains(c,i), xrange(len(c))),
68             )
69
70     # Test for separability
71     if nonseparable:
72         components = clusters = []
73     else:
74         components = [
75             classify.classMembers(c, partition)
76             for c in c_req
77         ]
78         clusters = setclusters.disjoint_sets(*components)
79     
80     if len(clusters) > 1:
81         if verbose:
82             _log(logstream, "\nDetected %d clusters", len(clusters))
83         
84         # Requests are separable
85         # Solve each part separately, then rejoin them
86         
87         # Build a class for each cluster
88         clustermaps = []
89         compmap = dict([(pid,idx) for idx,pid in enumerate(map(id,components))])
90         for cluster in clusters:
91             cluster_class = classify.getClass(
92                 reduce(operator.or_, cluster),
93                 partition )
94             clustermaps.append(cluster_class)
95         
96         # Build a plan: assign a cluster to each request
97         plan = []
98         for cluster_class in clustermaps:
99             plan_reqs = []
100             for c, c_requests in c_reqlist.iteritems():
101                 if classify.isSubclass(cluster_class, c):
102                     plan_reqs.extend(c_requests)
103             plan.append(plan_reqs)
104         
105         # Execute the plan
106         partial_results = []
107         for i,plan_req in enumerate(plan):
108             if verbose:
109                 _log(logstream, "Solving cluster %d/%d", i+1, len(plan))
110             partial_results.append(alloc(plan_req, 
111                 logstream, 
112                 nonseparable = True,
113                 saveinteresting = saveinteresting,
114                 backtracklim = backtracklim,
115                 verbose = verbose))
116         
117         # Join results
118         if verbose:
119             _log(logstream, "Joining partial results")
120         reqmap = dict([(pid,idx) for idx,pid in enumerate(map(id,requests))])
121         joint = [None] * len(requests)
122         for partial_result, partial_requests in zip(partial_results, plan):
123                 for assignment, partial_request in zip(partial_result, partial_requests):
124                     joint[reqmap[id(partial_request)]] = assignment
125         
126         return joint
127     else:
128         # Non-separable request, solve
129         #_log(logstream, "Non-separable request")
130         
131         # Solve
132         partial = collections.defaultdict(list)
133         Pavail = list(c_uni)
134         Gavail = dict([
135             (c, len(avail(c, partition)))
136             for c in c_req
137         ])
138         req = dict(c_req)
139         
140         # build a cardinality map
141         cardinality = dict([
142             (c, [classify.classCardinality(c,partition), -nreq])
143             for c,nreq in req.iteritems()
144         ])
145         
146         classContains = classify.classContains
147         isSubclass = classify.isSubclass
148         
149         stats = [
150             0, # ops
151             0, # back tracks
152             0, # skipped branches
153         ]
154         
155         def recursive_alloc():
156             # Successful termination condition: all requests satisfied
157             if not req:
158                 return True
159             
160             # Try in cardinality order
161             if quickstage:
162                 order = heapq.nsmallest(2, req, key=Gavail.__getitem__)
163             else:
164                 order = sorted(req, key=Gavail.__getitem__)
165             
166             # Do backtracking on those whose cardinality leaves a choice
167             # Force a pick when it does not
168             if order and (Gavail[order[0]] <= 1
169                           or classify.classCardinality(order[0]) <= 1):
170                 order = order[:1]
171             
172             for c in order:
173                 nreq = req[c]
174                 #carditem = cardinality[c]
175                 for i,bit in enumerate(c):
176                     if bit == "1" and Pavail[i]:
177                         stats[0] += 1 # ops+1
178                         
179                         subreq = min(Pavail[i], nreq)
180                         
181                         # branch sanity check
182                         skip = False
183                         for c2,navail in Gavail.iteritems():
184                             if c2 != c and classContains(c2, i) and (navail - subreq) < req.get(c2,0):
185                                 # Fail branch, don't even try
186                                 skip = True
187                                 break
188                         if skip:
189                             stats[2] += 1 # skipped branches + 1
190                             continue
191                         
192                         # forward track
193                         partial[c].append((i,subreq))
194                         Pavail[i] -= subreq
195                         #carditem[1] -= subreq
196                         
197                         for c2 in Gavail:
198                             if classContains(c2, i):
199                                 Gavail[c2] -= subreq
200                         
201                         if subreq < nreq:
202                             req[c] -= subreq
203                         else:
204                             del req[c]
205                         
206                         # Try to solve recursively
207                         success = recursive_alloc()
208                         
209                         if success:
210                             return success
211                         
212                         # Back track
213                         del partial[c][-1]
214                         Pavail[i] += subreq
215                         #carditem[1] += subreq
216                         
217                         for c2 in Gavail:
218                             if classContains(c2, i):
219                                 Gavail[c2] += subreq
220                         
221                         if subreq < nreq:
222                             req[c] += subreq
223                         else:
224                             req[c] = subreq
225                         
226                         stats[1] += 1 # backtracks + 1
227                         
228                         if (logstream or (saveinteresting is not None)) and (stats[1] & 0xffff) == 0:
229                             _log(logstream, "%r\n%r\n... stats: ops=%d, backtracks=%d, skipped=%d", Gavail, req,
230                                 *stats)
231                             
232                             if stats[1] == 0x1400000:
233                                 # Interesting case, log it out
234                                 _log(logstream, "... interesting case: %r", requests)
235                                 
236                                 if saveinteresting is not None:
237                                     saveinteresting.append(requests)
238                 if stats[1] > backtracklim:
239                     break
240                             
241             
242             # We tried and tried... and failed
243             return False
244         
245         # First try quickly (assign most selective first exclusively)
246         quickstage = True
247         success = recursive_alloc()
248         if not success:
249             # If it fails, retry exhaustively (try all assignment orders)
250             quickstage = False
251             success = recursive_alloc()
252         
253         if verbose or (not success or stats[1] or stats[2]):
254             _log(logstream, "%s with stats: ops=%d, backtracks=%d, skipped=%d",
255                 ("Succeeded" if success else "Failed"),
256                 *stats)
257         
258         if not success:
259             raise ResourceAllocationError, "Insufficient resources to grant request"
260         
261         # Perform actual assignment
262         Pavail = map(set, partition)
263         solution = {}
264         for c, partial_assignments in partial.iteritems():
265             psol = set()
266             for i, nreq in partial_assignments:
267                 part = Pavail[i]
268                 if len(part) < nreq:
269                     raise AssertionError, "Cannot allocate resources for supposedly valid solution!"
270                 assigned = set(sample(part, nreq))
271                 psol |= assigned
272                 part -= assigned
273             solution[c] = psol
274         
275         # Format solution for the caller (return a node id for each request)
276         reqmap = {}
277         for c,reqs in c_reqlist.iteritems():
278             for req in reqs:
279                 reqmap[id(req)] = c
280         
281         req_solution = []
282         for req in requests:
283             c = reqmap[id(req)]
284             req_solution.append(solution[c].pop())
285         
286         return req_solution
287
288
289 if __name__ == '__main__':
290     def test():
291         import random
292         import sys
293         
294         toughcases = [
295           (False,
296             [[9, 11, 12, 14, 16, 17, 18, 20, 21], 
297              [2], 
298              [2], 
299              [4, 5, 6, 7, 8, 11, 12, 13, 18, 22], 
300              [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22], 
301              [6, 10, 11, 13, 14, 15, 16, 18, 20], 
302              [3, 7, 8, 9, 10, 12, 14, 17, 22], 
303              [0, 1, 3, 4, 5, 6, 7, 8, 10, 13, 14, 17, 19, 21, 22], 
304              [16, 22]]),
305           (False,
306             [[2, 10, 0, 3, 4, 8], 
307              [4, 1, 6, 10, 2, 0, 5, 9, 8, 7], 
308              [8, 3, 0, 2, 1, 4, 10, 7, 5], 
309              [8], 
310              [2], 
311              [2, 8], 
312              [2, 7, 8, 3, 1, 0, 9, 10, 5, 4, 6], 
313              [2, 4, 8, 10, 1, 3, 9], 
314              [3, 0, 5]]),
315           (True,
316             [[2, 10, 0, 3, 4, 8], 
317              [4, 1, 6, 10, 2, 0, 5, 9, 8, 7], 
318              [8, 3, 0, 2, 1, 4, 10, 7, 5], 
319              [8], 
320              [2, 8], 
321              [2, 7, 8, 3, 1, 0, 9, 10, 5, 4, 6], 
322              [2, 4, 8, 10, 1, 3, 9], 
323              [3, 0, 5]]),
324         ]
325         
326         # Test tough cases
327         for n,(solvable,req) in enumerate(toughcases):
328             print "Trying #R = %4d, #S = %4d (tough case %d)" % (len(req), len(reduce(operator.or_, map(set,req))), n)
329             try:
330                 solution = alloc(req, sys.stdout, verbose=False)
331                 if solvable:
332                     print "  OK - allocation successful"
333                 else:
334                     raise AssertionError, "Case %r had no solution, but got %r" % (req, solution)
335             except ResourceAllocationError: 
336                 if not solvable:
337                     print "  OK - allocation not possible"
338                 else:
339                     raise AssertionError, "Case %r had a solution, but got none" % (req,)
340         
341         interesting = []
342         
343         suc_mostlypossible = mostlypossible = 0
344         suc_mostlyimpossible = mostlyimpossible = 0
345         suc_huge = huge = 0
346         
347         try:
348             # Fuzzer - mostly impossible cases
349             for i in xrange(10000):
350                 nreq = random.randint(1,20)
351                 nsrv = random.randint(max(1,nreq-5),50)
352                 srv = range(nsrv)
353                 req = [
354                     random.sample(srv, random.randint(1,nsrv))
355                     for j in xrange(nreq)
356                 ]
357                 print "Trying %5d: #R = %4d, #S = %4d... " % (i, nreq, nsrv),
358                 sys.stdout.flush()
359                 mostlyimpossible += 1
360                 try:
361                     solution = alloc(req, sys.stdout, saveinteresting = interesting, verbose=False)
362                     suc_mostlyimpossible += 1
363                     print "  OK - allocation successful  \r",
364                 except ResourceAllocationError: 
365                     print "  OK - allocation not possible  \r",
366                 except KeyboardInterrupt:
367                     print "ABORTING CASE %r" % (req,)
368                     raise
369                 sys.stdout.flush()
370
371             # Fuzzer - mostly possible cases
372             for i in xrange(10000):
373                 nreq = random.randint(1,10)
374                 nsrv = random.randint(nreq,100)
375                 srv = range(nsrv)
376                 req = [
377                     random.sample(srv, random.randint(min(nreq,nsrv),nsrv))
378                     for j in xrange(nreq)
379                 ]
380                 print "Trying %5d: #R = %4d, #S = %4d... " % (i, nreq, nsrv),
381                 sys.stdout.flush()
382                 mostlypossible += 1
383                 try:
384                     solution = alloc(req, sys.stdout, saveinteresting = interesting, verbose=False)
385                     suc_mostlypossible += 1
386                     print "  OK - allocation successful  \r",
387                 except ResourceAllocationError: 
388                     print "  OK - allocation not possible  \r",
389                 except KeyboardInterrupt:
390                     print "ABORTING CASE %r" % (req,)
391                     raise
392                 sys.stdout.flush()
393
394             # Fuzzer - biiig cases
395             for i in xrange(10):
396                 nreq = random.randint(1,500)
397                 nsrv = random.randint(1,8000)
398                 srv = range(nsrv)
399                 req = [
400                     random.sample(srv, random.randint(min(nreq,nsrv),nsrv))
401                     for j in xrange(nreq)
402                 ]
403                 print "Trying %4d: #R = %4d, #S = %4d... " % (i, nreq, nsrv),
404                 sys.stdout.flush()
405                 huge += 1
406                 try:
407                     solution = alloc(req, sys.stdout, saveinteresting = interesting, verbose=False)
408                     suc_huge += 1
409                     print "  OK - allocation successful  \r",
410                 except ResourceAllocationError: 
411                     print "  OK - allocation not possible  \r",
412                 except KeyboardInterrupt:
413                     print "ABORTING CASE %r" % (req,)
414                     raise
415                 sys.stdout.flush()
416         except:
417             print "ABORTING TEST"
418         
419         print "\nSuccess rates:"
420         print "  Mostly possible: %d/%d (%.2f%%)" % (suc_mostlypossible, mostlypossible, 100.0 * suc_mostlypossible / max(1,mostlypossible))
421         print "  Mostly impossible: %d/%d (%.2f%%)" % (suc_mostlyimpossible, mostlyimpossible, 100.0 * suc_mostlyimpossible / max(1,mostlyimpossible))
422         print "  Huge: %d/%d (%.2f%%)" % (suc_huge, huge, 100.0 * suc_huge / max(1,huge))
423         
424         if interesting:
425             print "%d interesting requests:" % (len(interesting),)
426             for n,req in enumerate(interesting):
427                 print "Interesting request %d/%d: %r", (n,len(interesting),req,)
428     test()
429