Merging with HEAD
authorClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Mon, 30 May 2011 14:46:20 +0000 (16:46 +0200)
committerClaudio-Daniel Freire <claudio-daniel.freire@inria.fr>
Mon, 30 May 2011 14:46:20 +0000 (16:46 +0200)
Fixing a few thingies in tunproto

setup.py
src/nepi/testbeds/planetlab/execute.py
src/nepi/testbeds/planetlab/metadata_v01.py
src/nepi/testbeds/planetlab/node.py
src/nepi/testbeds/planetlab/resourcealloc.py [new file with mode: 0644]
src/nepi/testbeds/planetlab/tunproto.py
src/nepi/util/settools/classify.py [new file with mode: 0644]
test/testbeds/planetlab/execute.py

index 36a1081..e6690a0 100755 (executable)
--- a/setup.py
+++ b/setup.py
@@ -18,6 +18,7 @@ setup(
             "nepi.testbeds.planetlab",
             "nepi.core",
             "nepi.util.parser",
+            "nepi.util.settools",
             "nepi.util" ],
         package_dir = {"": "src"},
         package_data = {"nepi.testbeds.planetlab" : ["scripts/*.py", "scripts/*.c"],
index d9506f5..4d4644d 100644 (file)
@@ -6,6 +6,7 @@ from nepi.core import testbed_impl
 from nepi.util.constants import TIME_NOW
 import os
 import time
+import resourcealloc
 
 class TestbedController(testbed_impl.TestbedController):
     def __init__(self, testbed_version):
@@ -80,12 +81,11 @@ class TestbedController(testbed_impl.TestbedController):
         super(TestbedController, self).do_preconfigure()
 
     def do_resource_discovery(self):
-        # Do what?
-
-        # Provisional algo:
+        to_provision = self._to_provision = set()
+        
+        # Initial algo:
         #   look for perfectly defined nodes
         #   (ie: those with only one candidate)
-        to_provision = self._to_provision = set()
         for guid, node in self._elements.iteritems():
             if isinstance(node, self._node.Node) and node._node_id is None:
                 # Try existing nodes first
@@ -98,13 +98,44 @@ class TestbedController(testbed_impl.TestbedController):
                     # Try again including unassigned nodes
                     candidates = node.find_candidates()
                     if len(candidates) > 1:
-                        raise RuntimeError, "Cannot assign resources for node %s, too many candidates" % (guid,)
+                        continue
                     if len(candidates) == 1:
                         node_id = iter(candidates).next()
                         node.assign_node_id(node_id)
                         to_provision.add(node_id)
                     elif not candidates:
-                        raise RuntimeError, "Cannot assign resources for node %s, no candidates" % (guid,)
+                        raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
+                            node.make_filter_description())
+        
+        # Now do the backtracking search for a suitable solution
+        # First with existing slice nodes
+        reqs = []
+        nodes = []
+        for guid, node in self._elements.iteritems():
+            if isinstance(node, self._node.Node) and node._node_id is None:
+                # Try existing nodes first
+                # If we have only one candidate, simply use it
+                candidates = node.find_candidates(
+                    filter_slice_id = self.slice_id)
+                reqs.append(candidates)
+                nodes.append(node)
+        
+        if nodes and reqs:
+            try:
+                solution = resourcealloc.alloc(reqs)
+            except resourcealloc.ResourceAllocationError:
+                # Failed, try again with all nodes
+                reqs = []
+                for node in nodes:
+                    candidates = node.find_candidates()
+                    reqs.append(candidates)
+                
+                solution = resourcealloc.alloc(reqs)
+                to_provision.update(solution)
+            
+            # Do assign nodes
+            for node, node_id in zip(nodes, solution):
+                node.assign_node_id(node_id)
 
     def do_provisioning(self):
         if self._to_provision:
index d5c0ab9..885c181 100644 (file)
@@ -43,13 +43,13 @@ def is_addrlist(attribute, value):
         if '/' in component:
             addr, mask = component.split('/',1)
         else:
-            addr, mask = component, 32
+            addr, mask = component, '32'
         
         if mask is not None and not (mask and mask.isdigit()):
             # No empty or nonnumeric masks
             return False
         
-        if not validation.is_ip4_address(attribute, value):
+        if not validation.is_ip4_address(attribute, addr):
             # Address part must be ipv4
             return False
         
@@ -166,12 +166,17 @@ def create_node(testbed_instance, guid):
     
     # add constraint on number of (real) interfaces
     # by counting connected devices
-    dev_guids = testbed_instance.get_connected(guid, "node", "devs")
+    dev_guids = testbed_instance.get_connected(guid, "devs", "node")
     num_open_ifaces = sum( # count True values
         NODEIFACE == testbed_instance._get_factory_id(guid)
         for guid in dev_guids )
     element.min_num_external_ifaces = num_open_ifaces
     
+    # require vroute vsys if we have routes to set up
+    routes = testbed_instance._add_route.get(guid)
+    if routes:
+        element.required_vsys.add("vroute")
+    
     testbed_instance.elements[guid] = element
 
 def create_nodeiface(testbed_instance, guid):
index 0b5b9de..6514372 100644 (file)
@@ -9,6 +9,7 @@ import time
 import os
 import collections
 import cStringIO
+import resourcealloc
 
 from nepi.util import server
 
@@ -23,12 +24,12 @@ class Node(object):
         #   There are replacements that are applied with string formatting,
         #   so '%' has to be escaped as '%%'.
         'architecture' : ('arch','value'),
-        'operating_system' : ('fcdistro','value'),
+        'operatingSystem' : ('fcdistro','value'),
         'pl_distro' : ('pldistro','value'),
-        'min_reliability' : ('reliability%(timeframe)s', ']value'),
-        'max_reliability' : ('reliability%(timeframe)s', '[value'),
-        'min_bandwidth' : ('bw%(timeframe)s', ']value'),
-        'max_bandwidth' : ('bw%(timeframe)s', '[value'),
+        'minReliability' : ('reliability%(timeframe)s', ']value'),
+        'maxReliability' : ('reliability%(timeframe)s', '[value'),
+        'minBandwidth' : ('bw%(timeframe)s', ']value'),
+        'maxBandwidth' : ('bw%(timeframe)s', '[value'),
     }    
     
     DEPENDS_PIDFILE = '/tmp/nepi-depends.pid'
@@ -42,14 +43,14 @@ class Node(object):
         # Attributes
         self.hostname = None
         self.architecture = None
-        self.operating_system = None
+        self.operatingSystem = None
         self.pl_distro = None
         self.site = None
         self.emulation = None
-        self.min_reliability = None
-        self.max_reliability = None
-        self.min_bandwidth = None
-        self.max_bandwidth = None
+        self.minReliability = None
+        self.maxReliability = None
+        self.minBandwidth = None
+        self.maxBandwidth = None
         self.min_num_external_ifaces = None
         self.max_num_external_ifaces = None
         self.timeframe = 'm'
@@ -105,9 +106,14 @@ class Node(object):
         
         # get initial candidates (no tag filters)
         basefilters = self.build_filters({}, self.BASEFILTERS)
+        rootfilters = basefilters.copy()
         if filter_slice_id:
             basefilters['|slice_ids'] = (filter_slice_id,)
         
+        # only pick healthy nodes
+        basefilters['run_level'] = 'boot'
+        basefilters['boot_state'] = 'boot'
+        
         # keyword-only "pseudofilters"
         extra = {}
         if self.site:
@@ -123,7 +129,7 @@ class Node(object):
             
             # don't bother if there's no filter defined
             if attr in applicable:
-                tagfilter = basefilters.copy()
+                tagfilter = rootfilters.copy()
                 tagfilter['tagname'] = tagname % replacements
                 tagfilter[expr % replacements] = getattr(self,attr)
                 tagfilter['node_id'] = list(candidates)
@@ -178,6 +184,42 @@ class Node(object):
             candidates = set(filter(predicate, candidates))
             
         return candidates
+    
+    def make_filter_description(self):
+        """
+        Makes a human-readable description of filtering conditions
+        for find_candidates.
+        """
+        
+        # get initial candidates (no tag filters)
+        filters = self.build_filters({}, self.BASEFILTERS)
+        
+        # keyword-only "pseudofilters"
+        if self.site:
+            filters['peer'] = self.site
+            
+        # filter by tag, one tag at a time
+        applicable = self.applicable_filters
+        for tagfilter in self.TAGFILTERS.iteritems():
+            attr, (tagname, expr) = tagfilter
+            
+            # don't bother if there's no filter defined
+            if attr in applicable:
+                filters[attr] = getattr(self,attr)
+        
+        # filter by vsys tags - special case since it doesn't follow
+        # the usual semantics
+        if self.required_vsys:
+            filters['vsys'] = ','.join(list(self.required_vsys))
+        
+        # filter by iface count
+        if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None:
+            filters['num_ifaces'] = '-'.join([
+                str(self.min_num_external_ifaces or '0'),
+                str(self.max_num_external_ifaces or 'inf')
+            ])
+            
+        return '; '.join(map('%s: %s'.__mod__,filters.iteritems()))
 
     def assign_node_id(self, node_id):
         self._node_id = node_id
diff --git a/src/nepi/testbeds/planetlab/resourcealloc.py b/src/nepi/testbeds/planetlab/resourcealloc.py
new file mode 100644 (file)
index 0000000..bafde65
--- /dev/null
@@ -0,0 +1,429 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import itertools
+import functools
+import operator
+import random
+import collections
+import heapq
+
+from nepi.util.settools import setclusters
+from nepi.util.settools import classify
+
+class ResourceAllocationError(Exception):
+    pass
+
+def multicardinal(multiset):
+    return sum(quant for c,quant in multiset.iteritems())
+
+def avail(cls, partition):
+    contains = classify.classContains
+    return reduce(operator.or_, 
+        classify.classComponents(cls, partition))
+
+def _log(logstream, message, *args, **kwargs):
+    if logstream:
+        if args:
+            logstream.write(message % args)
+        elif kwargs:
+            logstream.write(message % kwargs)
+        else:
+            logstream.write(message)
+        logstream.write('\n')
+
+def alloc(requests, logstream = None, nonseparable = False, saveinteresting = None, backtracklim = 100000000, verbose = True):
+    """
+    Takes an iterable over requests, which are iterables of candidate node ids,
+    and returns a specific node id for each request (if successful).
+    
+    If it cannot, it will raise an ResourceAllocationError.
+    """
+    
+    # First, materialize the request iterable
+    requests = map(set,requests)
+    
+    # Classify all candidates
+    universe = reduce(operator.or_, requests)
+    partition = setclusters.disjoint_partition(*requests)
+    
+    # Classify requests
+    c_reqlist = classify.classify(requests, partition)
+    c_req = dict(
+        (c,len(r))
+        for c,r in c_reqlist.iteritems()
+    )
+    
+    # Classify universe
+    c_uni = map(len, partition)
+    
+    # Perform invariant sanity checks
+    if multicardinal(c_req) > sum(c_uni):
+        raise ResourceAllocationError, "Insufficient resources to grant request"
+    
+    for c,nreq in c_req.iteritems():
+        if nreq > len(avail(c, partition)):
+            raise ResourceAllocationError, "Insufficient resources to grant request, empty categories %s" % (
+                filter(lambda i : classify.classContains(c,i), xrange(len(c))),
+            )
+
+    # Test for separability
+    if nonseparable:
+        components = clusters = []
+    else:
+        components = [
+            classify.classMembers(c, partition)
+            for c in c_req
+        ]
+        clusters = setclusters.disjoint_sets(*components)
+    
+    if len(clusters) > 1:
+        if verbose:
+            _log(logstream, "\nDetected %d clusters", len(clusters))
+        
+        # Requests are separable
+        # Solve each part separately, then rejoin them
+        
+        # Build a class for each cluster
+        clustermaps = []
+        compmap = dict([(pid,idx) for idx,pid in enumerate(map(id,components))])
+        for cluster in clusters:
+            cluster_class = classify.getClass(
+                reduce(operator.or_, cluster),
+                partition )
+            clustermaps.append(cluster_class)
+        
+        # Build a plan: assign a cluster to each request
+        plan = []
+        for cluster_class in clustermaps:
+            plan_reqs = []
+            for c, c_requests in c_reqlist.iteritems():
+                if classify.isSubclass(cluster_class, c):
+                    plan_reqs.extend(c_requests)
+            plan.append(plan_reqs)
+        
+        # Execute the plan
+        partial_results = []
+        for i,plan_req in enumerate(plan):
+            if verbose:
+                _log(logstream, "Solving cluster %d/%d", i+1, len(plan))
+            partial_results.append(alloc(plan_req, 
+                logstream, 
+                nonseparable = True,
+                saveinteresting = saveinteresting,
+                backtracklim = backtracklim,
+                verbose = verbose))
+        
+        # Join results
+        if verbose:
+            _log(logstream, "Joining partial results")
+        reqmap = dict([(pid,idx) for idx,pid in enumerate(map(id,requests))])
+        joint = [None] * len(requests)
+        for partial_result, partial_requests in zip(partial_results, plan):
+                for assignment, partial_request in zip(partial_result, partial_requests):
+                    joint[reqmap[id(partial_request)]] = assignment
+        
+        return joint
+    else:
+        # Non-separable request, solve
+        #_log(logstream, "Non-separable request")
+        
+        # Solve
+        partial = collections.defaultdict(list)
+        Pavail = list(c_uni)
+        Gavail = dict([
+            (c, len(avail(c, partition)))
+            for c in c_req
+        ])
+        req = dict(c_req)
+        
+        # build a cardinality map
+        cardinality = dict([
+            (c, [classify.classCardinality(c,partition), -nreq])
+            for c,nreq in req.iteritems()
+        ])
+        
+        classContains = classify.classContains
+        isSubclass = classify.isSubclass
+        
+        stats = [
+            0, # ops
+            0, # back tracks
+            0, # skipped branches
+        ]
+        
+        def recursive_alloc():
+            # Successful termination condition: all requests satisfied
+            if not req:
+                return True
+            
+            # Try in cardinality order
+            if quickstage:
+                order = heapq.nsmallest(2, req, key=Gavail.__getitem__)
+            else:
+                order = sorted(req, key=Gavail.__getitem__)
+            
+            # Do backtracking on those whose cardinality leaves a choice
+            # Force a pick when it does not
+            if order and (Gavail[order[0]] <= 1
+                          or classify.classCardinality(order[0]) <= 1):
+                order = order[:1]
+            
+            for c in order:
+                nreq = req[c]
+                #carditem = cardinality[c]
+                for i,bit in enumerate(c):
+                    if bit == "1" and Pavail[i]:
+                        stats[0] += 1 # ops+1
+                        
+                        subreq = min(Pavail[i], nreq)
+                        
+                        # branch sanity check
+                        skip = False
+                        for c2,navail in Gavail.iteritems():
+                            if c2 != c and classContains(c2, i) and (navail - subreq) < req.get(c2,0):
+                                # Fail branch, don't even try
+                                skip = True
+                                break
+                        if skip:
+                            stats[2] += 1 # skipped branches + 1
+                            continue
+                        
+                        # forward track
+                        partial[c].append((i,subreq))
+                        Pavail[i] -= subreq
+                        #carditem[1] -= subreq
+                        
+                        for c2 in Gavail:
+                            if classContains(c2, i):
+                                Gavail[c2] -= subreq
+                        
+                        if subreq < nreq:
+                            req[c] -= subreq
+                        else:
+                            del req[c]
+                        
+                        # Try to solve recursively
+                        success = recursive_alloc()
+                        
+                        if success:
+                            return success
+                        
+                        # Back track
+                        del partial[c][-1]
+                        Pavail[i] += subreq
+                        #carditem[1] += subreq
+                        
+                        for c2 in Gavail:
+                            if classContains(c2, i):
+                                Gavail[c2] += subreq
+                        
+                        if subreq < nreq:
+                            req[c] += subreq
+                        else:
+                            req[c] = subreq
+                        
+                        stats[1] += 1 # backtracks + 1
+                        
+                        if (logstream or (saveinteresting is not None)) and (stats[1] & 0xffff) == 0:
+                            _log(logstream, "%r\n%r\n... stats: ops=%d, backtracks=%d, skipped=%d", Gavail, req,
+                                *stats)
+                            
+                            if stats[1] == 0x1400000:
+                                # Interesting case, log it out
+                                _log(logstream, "... interesting case: %r", requests)
+                                
+                                if saveinteresting is not None:
+                                    saveinteresting.append(requests)
+                if stats[1] > backtracklim:
+                    break
+                            
+            
+            # We tried and tried... and failed
+            return False
+        
+        # First try quickly (assign most selective first exclusively)
+        quickstage = True
+        success = recursive_alloc()
+        if not success:
+            # If it fails, retry exhaustively (try all assignment orders)
+            quickstage = False
+            success = recursive_alloc()
+        
+        if verbose or (not success or stats[1] or stats[2]):
+            _log(logstream, "%s with stats: ops=%d, backtracks=%d, skipped=%d",
+                ("Succeeded" if success else "Failed"),
+                *stats)
+        
+        if not success:
+            raise ResourceAllocationError, "Insufficient resources to grant request"
+        
+        # Perform actual assignment
+        Pavail = map(set, partition)
+        solution = {}
+        for c, partial_assignments in partial.iteritems():
+            psol = set()
+            for i, nreq in partial_assignments:
+                part = Pavail[i]
+                if len(part) < nreq:
+                    raise AssertionError, "Cannot allocate resources for supposedly valid solution!"
+                assigned = set(random.sample(part, nreq))
+                psol |= assigned
+                part -= assigned
+            solution[c] = psol
+        
+        # Format solution for the caller (return a node id for each request)
+        reqmap = {}
+        for c,reqs in c_reqlist.iteritems():
+            for req in reqs:
+                reqmap[id(req)] = c
+        
+        req_solution = []
+        for req in requests:
+            c = reqmap[id(req)]
+            req_solution.append(solution[c].pop())
+        
+        return req_solution
+
+
+if __name__ == '__main__':
+    def test():
+        import random
+        import sys
+        
+        toughcases = [
+          (False,
+            [[9, 11, 12, 14, 16, 17, 18, 20, 21], 
+             [2], 
+             [2], 
+             [4, 5, 6, 7, 8, 11, 12, 13, 18, 22], 
+             [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22], 
+             [6, 10, 11, 13, 14, 15, 16, 18, 20], 
+             [3, 7, 8, 9, 10, 12, 14, 17, 22], 
+             [0, 1, 3, 4, 5, 6, 7, 8, 10, 13, 14, 17, 19, 21, 22], 
+             [16, 22]]),
+          (False,
+            [[2, 10, 0, 3, 4, 8], 
+             [4, 1, 6, 10, 2, 0, 5, 9, 8, 7], 
+             [8, 3, 0, 2, 1, 4, 10, 7, 5], 
+             [8], 
+             [2], 
+             [2, 8], 
+             [2, 7, 8, 3, 1, 0, 9, 10, 5, 4, 6], 
+             [2, 4, 8, 10, 1, 3, 9], 
+             [3, 0, 5]]),
+          (True,
+            [[2, 10, 0, 3, 4, 8], 
+             [4, 1, 6, 10, 2, 0, 5, 9, 8, 7], 
+             [8, 3, 0, 2, 1, 4, 10, 7, 5], 
+             [8], 
+             [2, 8], 
+             [2, 7, 8, 3, 1, 0, 9, 10, 5, 4, 6], 
+             [2, 4, 8, 10, 1, 3, 9], 
+             [3, 0, 5]]),
+        ]
+        
+        # Test tough cases
+        for n,(solvable,req) in enumerate(toughcases):
+            print "Trying #R = %4d, #S = %4d (tough case %d)" % (len(req), len(reduce(operator.or_, map(set,req))), n)
+            try:
+                solution = alloc(req, sys.stdout, verbose=False)
+                if solvable:
+                    print "  OK - allocation successful"
+                else:
+                    raise AssertionError, "Case %r had no solution, but got %r" % (req, solution)
+            except ResourceAllocationError: 
+                if not solvable:
+                    print "  OK - allocation not possible"
+                else:
+                    raise AssertionError, "Case %r had a solution, but got none" % (req,)
+        
+        interesting = []
+        
+        suc_mostlypossible = mostlypossible = 0
+        suc_mostlyimpossible = mostlyimpossible = 0
+        suc_huge = huge = 0
+        
+        try:
+            # Fuzzer - mostly impossible cases
+            for i in xrange(10000):
+                nreq = random.randint(1,20)
+                nsrv = random.randint(max(1,nreq-5),50)
+                srv = range(nsrv)
+                req = [
+                    random.sample(srv, random.randint(1,nsrv))
+                    for j in xrange(nreq)
+                ]
+                print "Trying %5d: #R = %4d, #S = %4d... " % (i, nreq, nsrv),
+                sys.stdout.flush()
+                mostlyimpossible += 1
+                try:
+                    solution = alloc(req, sys.stdout, saveinteresting = interesting, verbose=False)
+                    suc_mostlyimpossible += 1
+                    print "  OK - allocation successful  \r",
+                except ResourceAllocationError: 
+                    print "  OK - allocation not possible  \r",
+                except KeyboardInterrupt:
+                    print "ABORTING CASE %r" % (req,)
+                    raise
+                sys.stdout.flush()
+
+            # Fuzzer - mostly possible cases
+            for i in xrange(10000):
+                nreq = random.randint(1,10)
+                nsrv = random.randint(nreq,100)
+                srv = range(nsrv)
+                req = [
+                    random.sample(srv, random.randint(min(nreq,nsrv),nsrv))
+                    for j in xrange(nreq)
+                ]
+                print "Trying %5d: #R = %4d, #S = %4d... " % (i, nreq, nsrv),
+                sys.stdout.flush()
+                mostlypossible += 1
+                try:
+                    solution = alloc(req, sys.stdout, saveinteresting = interesting, verbose=False)
+                    suc_mostlypossible += 1
+                    print "  OK - allocation successful  \r",
+                except ResourceAllocationError: 
+                    print "  OK - allocation not possible  \r",
+                except KeyboardInterrupt:
+                    print "ABORTING CASE %r" % (req,)
+                    raise
+                sys.stdout.flush()
+
+            # Fuzzer - biiig cases
+            for i in xrange(10):
+                nreq = random.randint(1,500)
+                nsrv = random.randint(1,8000)
+                srv = range(nsrv)
+                req = [
+                    random.sample(srv, random.randint(min(nreq,nsrv),nsrv))
+                    for j in xrange(nreq)
+                ]
+                print "Trying %4d: #R = %4d, #S = %4d... " % (i, nreq, nsrv),
+                sys.stdout.flush()
+                huge += 1
+                try:
+                    solution = alloc(req, sys.stdout, saveinteresting = interesting, verbose=False)
+                    suc_huge += 1
+                    print "  OK - allocation successful  \r",
+                except ResourceAllocationError: 
+                    print "  OK - allocation not possible  \r",
+                except KeyboardInterrupt:
+                    print "ABORTING CASE %r" % (req,)
+                    raise
+                sys.stdout.flush()
+        except:
+            print "ABORTING TEST"
+        
+        print "\nSuccess rates:"
+        print "  Mostly possible: %d/%d (%.2f%%)" % (suc_mostlypossible, mostlypossible, 100.0 * suc_mostlypossible / max(1,mostlypossible))
+        print "  Mostly impossible: %d/%d (%.2f%%)" % (suc_mostlyimpossible, mostlyimpossible, 100.0 * suc_mostlyimpossible / max(1,mostlyimpossible))
+        print "  Huge: %d/%d (%.2f%%)" % (suc_huge, huge, 100.0 * suc_huge / max(1,huge))
+        
+        if interesting:
+            print "%d interesting requests:" % (len(interesting),)
+            for n,req in enumerate(interesting):
+                print "Interesting request %d/%d: %r", (n,len(interesting),req,)
+    test()
+
index df23a7b..0ea63b9 100644 (file)
@@ -425,6 +425,11 @@ class TunProtoUDP(TunProtoBase):
     def shutdown(self):
         self.kill()
 
+    def launch(self, check_proto='udp', listen=False, extra_args=None):
+        if extra_args is None:
+            extra_args = ("-u",str(self.port))
+        super(TunProtoUDP, self).launch(check_proto, listen, extra_args)
+
 class TunProtoFD(TunProtoBase):
     def __init__(self, local, peer, home_path, key, listening):
         super(TunProtoFD, self).__init__(local, peer, home_path, key)
@@ -439,6 +444,9 @@ class TunProtoFD(TunProtoBase):
     def shutdown(self):
         self.kill()
 
+    def launch(self, check_proto='fd', listen=False, extra_args=[]):
+        super(TunProtoFD, self).launch(check_proto, listen, extra_args)
+
 class TunProtoTCP(TunProtoBase):
     def __init__(self, local, peer, home_path, key, listening):
         super(TunProtoTCP, self).__init__(local, peer, home_path, key)
@@ -466,6 +474,11 @@ class TunProtoTCP(TunProtoBase):
     def shutdown(self):
         self.kill()
 
+    def launch(self, check_proto='tcp', listen=None, extra_args=[]):
+        if listen is None:
+            listen = self.listening
+        super(TunProtoTCP, self).launch(check_proto, listen, extra_args)
+
 class TapProtoUDP(TunProtoUDP):
     def __init__(self, local, peer, home_path, key, listening):
         super(TapProtoUDP, self).__init__(local, peer, home_path, key, listening)
diff --git a/src/nepi/util/settools/classify.py b/src/nepi/util/settools/classify.py
new file mode 100644 (file)
index 0000000..2eed402
--- /dev/null
@@ -0,0 +1,76 @@
+import setclusters
+import collections
+import itertools
+import operator
+
+def classify(requests, partition):
+    """
+    Takes an iterable over requests and a classification, and classifies the requests
+    returning a mapping from their classification (bitmap of applicable partitions) to
+    lists of requests.
+    
+    Params:
+    
+        requests: iterable over sets of viable candidates for a request
+        
+        partition: sequence of sets of candidates that forms a partition
+            over all available candidates.
+    
+    Returns:
+        
+        { str : [requests] }
+    """
+    rv = collections.defaultdict(list)
+    
+    for request in requests:
+        rv[getClass(request, partition)].append(request)
+    
+    return dict(rv)
+
+def getClass(set, partition):
+    return "".join(
+        map("01".__getitem__, [
+            bool(set & part)
+            for part in partition
+        ])
+    )
+    
+
+def isSubclass(superclass, subclass):
+    """
+    Returns True iff 'superclass' includes all elements of 'subclass'
+    
+    >>> isSubclass("1100","1000")
+    True
+    >>> isSubclass("1100","1100")
+    True
+    >>> isSubclass("0000","0001")
+    False
+    """
+    for superbit, subbit in itertools.izip(superclass, subclass):
+        if subbit and not superbit:
+            return False
+    else:
+        return True
+
+def classContains(clz, partIndex):
+    return clz[partIndex] == "1"
+
+def classCardinality(clz, partition = None):
+    if not partition:
+        return sum(itertools.imap("1".__eq__, clz))
+    else:
+        return sum(len(part) for bit,part in zip(clz,partition) 
+                   if bit == "1" )
+
+def classMembers(clz, partition):
+    return reduce(operator.or_, classComponents(clz, partition))
+
+def classComponents(clz, partition):
+    return [
+        partition[i]
+        for i,bit in enumerate(clz)
+        if bit == "1"
+    ]
+
+
index 920ef03..ecff9e6 100755 (executable)
@@ -509,6 +509,63 @@ echo 'OKIDOKI'
         
         # asserts at the end, to make sure there's proper cleanup
         self.assertEqual(ping_result, "")
+
+    @test_util.skipUnless(test_util.pl_auth() is not None, "Test requires PlanetLab authentication info (PL_USER and PL_PASS environment variables)")
+    def test_discovery(self):
+        instance = self.make_instance()
+        
+        instance.defer_create(2, "Node")
+        instance.defer_create_set(2, "operatingSystem", "f12")
+        instance.defer_create(3, "Node")
+        instance.defer_create_set(3, "operatingSystem", "f12")
+        instance.defer_create(4, "NodeInterface")
+        instance.defer_connect(2, "devs", 4, "node")
+        instance.defer_create(5, "NodeInterface")
+        instance.defer_connect(3, "devs", 5, "node")
+        instance.defer_create(6, "Internet")
+        instance.defer_connect(4, "inet", 6, "devs")
+        instance.defer_connect(5, "inet", 6, "devs")
+        instance.defer_create(7, "Application")
+        instance.defer_create_set(7, "command", "ping -qc1 {#[GUID-5].addr[0].[Address]#}")
+        instance.defer_add_trace(7, "stdout")
+        instance.defer_add_trace(7, "stderr")
+        instance.defer_connect(7, "node", 2, "apps")
+
+        comp_result = r"""PING .* \(.*\) \d*\(\d*\) bytes of data.
+
+--- .* ping statistics ---
+1 packets transmitted, 1 received, 0% packet loss, time \d*ms.*
+"""
+
+        try:
+            instance.do_setup()
+            instance.do_create()
+            instance.do_connect_init()
+            instance.do_connect_compl()
+            instance.do_preconfigure()
+            
+            # Manually replace netref
+            instance.set(7, "command",
+                instance.get(7, "command")
+                    .replace("{#[GUID-5].addr[0].[Address]#}", 
+                        instance.get_address(5, 0, "Address") )
+            )
+
+            instance.do_configure()
+            
+            instance.do_prestart()
+            instance.start()
+            while instance.status(7) != STATUS_FINISHED:
+                time.sleep(0.5)
+            ping_result = instance.trace(7, "stdout") or ""
+            instance.stop()
+        finally:
+            instance.shutdown()
+
+        # asserts at the end, to make sure there's proper cleanup
+        self.assertTrue(re.match(comp_result, ping_result, re.MULTILINE),
+            "Unexpected trace:\n" + ping_result)
+        
         
 
 if __name__ == '__main__':