import os
import collections
import cStringIO
+import resourcealloc
from nepi.util import server
# There are replacements that are applied with string formatting,
# so '%' has to be escaped as '%%'.
'architecture' : ('arch','value'),
- 'operating_system' : ('fcdistro','value'),
+ 'operatingSystem' : ('fcdistro','value'),
'pl_distro' : ('pldistro','value'),
- 'min_reliability' : ('reliability%(timeframe)s', ']value'),
- 'max_reliability' : ('reliability%(timeframe)s', '[value'),
- 'min_bandwidth' : ('bw%(timeframe)s', ']value'),
- 'max_bandwidth' : ('bw%(timeframe)s', '[value'),
+ 'minReliability' : ('reliability%(timeframe)s', ']value'),
+ 'maxReliability' : ('reliability%(timeframe)s', '[value'),
+ 'minBandwidth' : ('bw%(timeframe)s', ']value'),
+ 'maxBandwidth' : ('bw%(timeframe)s', '[value'),
}
DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
# Attributes
self.hostname = None
self.architecture = None
- self.operating_system = None
+ self.operatingSystem = None
self.pl_distro = None
self.site = None
self.emulation = None
- self.min_reliability = None
- self.max_reliability = None
- self.min_bandwidth = None
- self.max_bandwidth = None
+ self.minReliability = None
+ self.maxReliability = None
+ self.minBandwidth = None
+ self.maxBandwidth = None
self.min_num_external_ifaces = None
self.max_num_external_ifaces = None
self.timeframe = 'm'
# get initial candidates (no tag filters)
basefilters = self.build_filters({}, self.BASEFILTERS)
+ rootfilters = basefilters.copy()
if filter_slice_id:
basefilters['|slice_ids'] = (filter_slice_id,)
+ # only pick healthy nodes
+ basefilters['run_level'] = 'boot'
+ basefilters['boot_state'] = 'boot'
+
# keyword-only "pseudofilters"
extra = {}
if self.site:
# don't bother if there's no filter defined
if attr in applicable:
- tagfilter = basefilters.copy()
+ tagfilter = rootfilters.copy()
tagfilter['tagname'] = tagname % replacements
tagfilter[expr % replacements] = getattr(self,attr)
tagfilter['node_id'] = list(candidates)
candidates = set(filter(predicate, candidates))
return candidates
+
+ def make_filter_description(self):
+ """
+ Makes a human-readable description of filtering conditions
+ for find_candidates.
+ """
+
+ # get initial candidates (no tag filters)
+ filters = self.build_filters({}, self.BASEFILTERS)
+
+ # keyword-only "pseudofilters"
+ if self.site:
+ filters['peer'] = self.site
+
+ # filter by tag, one tag at a time
+ applicable = self.applicable_filters
+ for tagfilter in self.TAGFILTERS.iteritems():
+ attr, (tagname, expr) = tagfilter
+
+ # don't bother if there's no filter defined
+ if attr in applicable:
+ filters[attr] = getattr(self,attr)
+
+ # filter by vsys tags - special case since it doesn't follow
+ # the usual semantics
+ if self.required_vsys:
+ filters['vsys'] = ','.join(list(self.required_vsys))
+
+ # filter by iface count
+ if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
+ filters['num_ifaces'] = '-'.join([
+ str(self.min_num_external_ifaces or '0'),
+ str(self.max_num_external_ifaces or 'inf')
+ ])
+
+ return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
def assign_node_id(self, node_id):
self._node_id = node_id
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import itertools
+import functools
+import operator
+import random
+import collections
+import heapq
+
+from nepi.util.settools import setclusters
+from nepi.util.settools import classify
+
+class ResourceAllocationError(Exception):
+ pass
+
+def multicardinal(multiset):
+ return sum(quant for c,quant in multiset.iteritems())
+
+def avail(cls, partition):
+ contains = classify.classContains
+ return reduce(operator.or_,
+ classify.classComponents(cls, partition))
+
+def _log(logstream, message, *args, **kwargs):
+ if logstream:
+ if args:
+ logstream.write(message % args)
+ elif kwargs:
+ logstream.write(message % kwargs)
+ else:
+ logstream.write(message)
+ logstream.write('\n')
+
+def alloc(requests, logstream = None, nonseparable = False, saveinteresting = None, backtracklim = 100000000, verbose = True):
+ """
+ Takes an iterable over requests, which are iterables of candidate node ids,
+ and returns a specific node id for each request (if successful).
+
+ If it cannot, it will raise an ResourceAllocationError.
+ """
+
+ # First, materialize the request iterable
+ requests = map(set,requests)
+
+ # Classify all candidates
+ universe = reduce(operator.or_, requests)
+ partition = setclusters.disjoint_partition(*requests)
+
+ # Classify requests
+ c_reqlist = classify.classify(requests, partition)
+ c_req = dict(
+ (c,len(r))
+ for c,r in c_reqlist.iteritems()
+ )
+
+ # Classify universe
+ c_uni = map(len, partition)
+
+ # Perform invariant sanity checks
+ if multicardinal(c_req) > sum(c_uni):
+ raise ResourceAllocationError, "Insufficient resources to grant request"
+
+ for c,nreq in c_req.iteritems():
+ if nreq > len(avail(c, partition)):
+ raise ResourceAllocationError, "Insufficient resources to grant request, empty categories %s" % (
+ filter(lambda i : classify.classContains(c,i), xrange(len(c))),
+ )
+
+ # Test for separability
+ if nonseparable:
+ components = clusters = []
+ else:
+ components = [
+ classify.classMembers(c, partition)
+ for c in c_req
+ ]
+ clusters = setclusters.disjoint_sets(*components)
+
+ if len(clusters) > 1:
+ if verbose:
+ _log(logstream, "\nDetected %d clusters", len(clusters))
+
+ # Requests are separable
+ # Solve each part separately, then rejoin them
+
+ # Build a class for each cluster
+ clustermaps = []
+ compmap = dict([(pid,idx) for idx,pid in enumerate(map(id,components))])
+ for cluster in clusters:
+ cluster_class = classify.getClass(
+ reduce(operator.or_, cluster),
+ partition )
+ clustermaps.append(cluster_class)
+
+ # Build a plan: assign a cluster to each request
+ plan = []
+ for cluster_class in clustermaps:
+ plan_reqs = []
+ for c, c_requests in c_reqlist.iteritems():
+ if classify.isSubclass(cluster_class, c):
+ plan_reqs.extend(c_requests)
+ plan.append(plan_reqs)
+
+ # Execute the plan
+ partial_results = []
+ for i,plan_req in enumerate(plan):
+ if verbose:
+ _log(logstream, "Solving cluster %d/%d", i+1, len(plan))
+ partial_results.append(alloc(plan_req,
+ logstream,
+ nonseparable = True,
+ saveinteresting = saveinteresting,
+ backtracklim = backtracklim,
+ verbose = verbose))
+
+ # Join results
+ if verbose:
+ _log(logstream, "Joining partial results")
+ reqmap = dict([(pid,idx) for idx,pid in enumerate(map(id,requests))])
+ joint = [None] * len(requests)
+ for partial_result, partial_requests in zip(partial_results, plan):
+ for assignment, partial_request in zip(partial_result, partial_requests):
+ joint[reqmap[id(partial_request)]] = assignment
+
+ return joint
+ else:
+ # Non-separable request, solve
+ #_log(logstream, "Non-separable request")
+
+ # Solve
+ partial = collections.defaultdict(list)
+ Pavail = list(c_uni)
+ Gavail = dict([
+ (c, len(avail(c, partition)))
+ for c in c_req
+ ])
+ req = dict(c_req)
+
+ # build a cardinality map
+ cardinality = dict([
+ (c, [classify.classCardinality(c,partition), -nreq])
+ for c,nreq in req.iteritems()
+ ])
+
+ classContains = classify.classContains
+ isSubclass = classify.isSubclass
+
+ stats = [
+ 0, # ops
+ 0, # back tracks
+ 0, # skipped branches
+ ]
+
+ def recursive_alloc():
+ # Successful termination condition: all requests satisfied
+ if not req:
+ return True
+
+ # Try in cardinality order
+ if quickstage:
+ order = heapq.nsmallest(2, req, key=Gavail.__getitem__)
+ else:
+ order = sorted(req, key=Gavail.__getitem__)
+
+ # Do backtracking on those whose cardinality leaves a choice
+ # Force a pick when it does not
+ if order and (Gavail[order[0]] <= 1
+ or classify.classCardinality(order[0]) <= 1):
+ order = order[:1]
+
+ for c in order:
+ nreq = req[c]
+ #carditem = cardinality[c]
+ for i,bit in enumerate(c):
+ if bit == "1" and Pavail[i]:
+ stats[0] += 1 # ops+1
+
+ subreq = min(Pavail[i], nreq)
+
+ # branch sanity check
+ skip = False
+ for c2,navail in Gavail.iteritems():
+ if c2 != c and classContains(c2, i) and (navail - subreq) < req.get(c2,0):
+ # Fail branch, don't even try
+ skip = True
+ break
+ if skip:
+ stats[2] += 1 # skipped branches + 1
+ continue
+
+ # forward track
+ partial[c].append((i,subreq))
+ Pavail[i] -= subreq
+ #carditem[1] -= subreq
+
+ for c2 in Gavail:
+ if classContains(c2, i):
+ Gavail[c2] -= subreq
+
+ if subreq < nreq:
+ req[c] -= subreq
+ else:
+ del req[c]
+
+ # Try to solve recursively
+ success = recursive_alloc()
+
+ if success:
+ return success
+
+ # Back track
+ del partial[c][-1]
+ Pavail[i] += subreq
+ #carditem[1] += subreq
+
+ for c2 in Gavail:
+ if classContains(c2, i):
+ Gavail[c2] += subreq
+
+ if subreq < nreq:
+ req[c] += subreq
+ else:
+ req[c] = subreq
+
+ stats[1] += 1 # backtracks + 1
+
+ if (logstream or (saveinteresting is not None)) and (stats[1] & 0xffff) == 0:
+ _log(logstream, "%r\n%r\n... stats: ops=%d, backtracks=%d, skipped=%d", Gavail, req,
+ *stats)
+
+ if stats[1] == 0x1400000:
+ # Interesting case, log it out
+ _log(logstream, "... interesting case: %r", requests)
+
+ if saveinteresting is not None:
+ saveinteresting.append(requests)
+ if stats[1] > backtracklim:
+ break
+
+
+ # We tried and tried... and failed
+ return False
+
+ # First try quickly (assign most selective first exclusively)
+ quickstage = True
+ success = recursive_alloc()
+ if not success:
+ # If it fails, retry exhaustively (try all assignment orders)
+ quickstage = False
+ success = recursive_alloc()
+
+ if verbose or (not success or stats[1] or stats[2]):
+ _log(logstream, "%s with stats: ops=%d, backtracks=%d, skipped=%d",
+ ("Succeeded" if success else "Failed"),
+ *stats)
+
+ if not success:
+ raise ResourceAllocationError, "Insufficient resources to grant request"
+
+ # Perform actual assignment
+ Pavail = map(set, partition)
+ solution = {}
+ for c, partial_assignments in partial.iteritems():
+ psol = set()
+ for i, nreq in partial_assignments:
+ part = Pavail[i]
+ if len(part) < nreq:
+ raise AssertionError, "Cannot allocate resources for supposedly valid solution!"
+ assigned = set(random.sample(part, nreq))
+ psol |= assigned
+ part -= assigned
+ solution[c] = psol
+
+ # Format solution for the caller (return a node id for each request)
+ reqmap = {}
+ for c,reqs in c_reqlist.iteritems():
+ for req in reqs:
+ reqmap[id(req)] = c
+
+ req_solution = []
+ for req in requests:
+ c = reqmap[id(req)]
+ req_solution.append(solution[c].pop())
+
+ return req_solution
+
+
+if __name__ == '__main__':
+ def test():
+ import random
+ import sys
+
+ toughcases = [
+ (False,
+ [[9, 11, 12, 14, 16, 17, 18, 20, 21],
+ [2],
+ [2],
+ [4, 5, 6, 7, 8, 11, 12, 13, 18, 22],
+ [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22],
+ [6, 10, 11, 13, 14, 15, 16, 18, 20],
+ [3, 7, 8, 9, 10, 12, 14, 17, 22],
+ [0, 1, 3, 4, 5, 6, 7, 8, 10, 13, 14, 17, 19, 21, 22],
+ [16, 22]]),
+ (False,
+ [[2, 10, 0, 3, 4, 8],
+ [4, 1, 6, 10, 2, 0, 5, 9, 8, 7],
+ [8, 3, 0, 2, 1, 4, 10, 7, 5],
+ [8],
+ [2],
+ [2, 8],
+ [2, 7, 8, 3, 1, 0, 9, 10, 5, 4, 6],
+ [2, 4, 8, 10, 1, 3, 9],
+ [3, 0, 5]]),
+ (True,
+ [[2, 10, 0, 3, 4, 8],
+ [4, 1, 6, 10, 2, 0, 5, 9, 8, 7],
+ [8, 3, 0, 2, 1, 4, 10, 7, 5],
+ [8],
+ [2, 8],
+ [2, 7, 8, 3, 1, 0, 9, 10, 5, 4, 6],
+ [2, 4, 8, 10, 1, 3, 9],
+ [3, 0, 5]]),
+ ]
+
+ # Test tough cases
+ for n,(solvable,req) in enumerate(toughcases):
+ print "Trying #R = %4d, #S = %4d (tough case %d)" % (len(req), len(reduce(operator.or_, map(set,req))), n)
+ try:
+ solution = alloc(req, sys.stdout, verbose=False)
+ if solvable:
+ print " OK - allocation successful"
+ else:
+ raise AssertionError, "Case %r had no solution, but got %r" % (req, solution)
+ except ResourceAllocationError:
+ if not solvable:
+ print " OK - allocation not possible"
+ else:
+ raise AssertionError, "Case %r had a solution, but got none" % (req,)
+
+ interesting = []
+
+ suc_mostlypossible = mostlypossible = 0
+ suc_mostlyimpossible = mostlyimpossible = 0
+ suc_huge = huge = 0
+
+ try:
+ # Fuzzer - mostly impossible cases
+ for i in xrange(10000):
+ nreq = random.randint(1,20)
+ nsrv = random.randint(max(1,nreq-5),50)
+ srv = range(nsrv)
+ req = [
+ random.sample(srv, random.randint(1,nsrv))
+ for j in xrange(nreq)
+ ]
+ print "Trying %5d: #R = %4d, #S = %4d... " % (i, nreq, nsrv),
+ sys.stdout.flush()
+ mostlyimpossible += 1
+ try:
+ solution = alloc(req, sys.stdout, saveinteresting = interesting, verbose=False)
+ suc_mostlyimpossible += 1
+ print " OK - allocation successful \r",
+ except ResourceAllocationError:
+ print " OK - allocation not possible \r",
+ except KeyboardInterrupt:
+ print "ABORTING CASE %r" % (req,)
+ raise
+ sys.stdout.flush()
+
+ # Fuzzer - mostly possible cases
+ for i in xrange(10000):
+ nreq = random.randint(1,10)
+ nsrv = random.randint(nreq,100)
+ srv = range(nsrv)
+ req = [
+ random.sample(srv, random.randint(min(nreq,nsrv),nsrv))
+ for j in xrange(nreq)
+ ]
+ print "Trying %5d: #R = %4d, #S = %4d... " % (i, nreq, nsrv),
+ sys.stdout.flush()
+ mostlypossible += 1
+ try:
+ solution = alloc(req, sys.stdout, saveinteresting = interesting, verbose=False)
+ suc_mostlypossible += 1
+ print " OK - allocation successful \r",
+ except ResourceAllocationError:
+ print " OK - allocation not possible \r",
+ except KeyboardInterrupt:
+ print "ABORTING CASE %r" % (req,)
+ raise
+ sys.stdout.flush()
+
+ # Fuzzer - biiig cases
+ for i in xrange(10):
+ nreq = random.randint(1,500)
+ nsrv = random.randint(1,8000)
+ srv = range(nsrv)
+ req = [
+ random.sample(srv, random.randint(min(nreq,nsrv),nsrv))
+ for j in xrange(nreq)
+ ]
+ print "Trying %4d: #R = %4d, #S = %4d... " % (i, nreq, nsrv),
+ sys.stdout.flush()
+ huge += 1
+ try:
+ solution = alloc(req, sys.stdout, saveinteresting = interesting, verbose=False)
+ suc_huge += 1
+ print " OK - allocation successful \r",
+ except ResourceAllocationError:
+ print " OK - allocation not possible \r",
+ except KeyboardInterrupt:
+ print "ABORTING CASE %r" % (req,)
+ raise
+ sys.stdout.flush()
+ except:
+ print "ABORTING TEST"
+
+ print "\nSuccess rates:"
+ print " Mostly possible: %d/%d (%.2f%%)" % (suc_mostlypossible, mostlypossible, 100.0 * suc_mostlypossible / max(1,mostlypossible))
+ print " Mostly impossible: %d/%d (%.2f%%)" % (suc_mostlyimpossible, mostlyimpossible, 100.0 * suc_mostlyimpossible / max(1,mostlyimpossible))
+ print " Huge: %d/%d (%.2f%%)" % (suc_huge, huge, 100.0 * suc_huge / max(1,huge))
+
+ if interesting:
+ print "%d interesting requests:" % (len(interesting),)
+ for n,req in enumerate(interesting):
+ print "Interesting request %d/%d: %r", (n,len(interesting),req,)
+ test()
+