from nepi.util import ipaddr2
from nepi.util import environ
from nepi.util.parallel import ParallelRun
+import threading
import sys
import os
import os.path
self.slicename = None
self._traces = dict()
- import node, interfaces, application
+ import node, interfaces, application, multicast
self._node = node
self._interfaces = interfaces
self._app = application
+ self._multicast = multicast
self._blacklist = set()
self._just_provisioned = set()
self.dedicatedSlice = self._attributes.\
get_attribute_value("dedicatedSlice")
+ if not self.slicename:
+ raise RuntimeError, "Slice not set"
+ if not self.authUser:
+ raise RuntimeError, "PlanetLab account username not set"
+ if not self.authString:
+ raise RuntimeError, "PlanetLab account passphrase not set"
+ if not self.sliceSSHKey:
+ raise RuntimeError, "PlanetLab account key not specified"
+ if not os.path.exists(self.sliceSSHKey):
+ raise RuntimeError, "PlanetLab account key cannot be opened: %s" % (self.sliceSSHKey,)
+
self._logger.setLevel(getattr(logging,self.logLevel))
super(TestbedController, self).do_setup()
# Initial algo:
# look for perfectly defined nodes
# (ie: those with only one candidate)
- for guid, node in self._elements.iteritems():
- if isinstance(node, self._node.Node) and node._node_id is None:
- # Try existing nodes first
- # If we have only one candidate, simply use it
- candidates = node.find_candidates(
- filter_slice_id = self.slice_id)
+ reserve_lock = threading.Lock()
+ def assignifunique(guid, node):
+ # Try existing nodes first
+ # If we have only one candidate, simply use it
+ candidates = node.find_candidates(
+ filter_slice_id = self.slice_id)
+
+ node_id = None
+ reserve_lock.acquire()
+ try:
candidates -= reserved
if len(candidates) == 1:
node_id = iter(candidates).next()
- node.assign_node_id(node_id)
reserved.add(node_id)
elif not candidates:
# Try again including unassigned nodes
- candidates = node.find_candidates()
+ reserve_lock.release()
+ try:
+ candidates = node.find_candidates()
+ finally:
+ reserve_lock.acquire()
candidates -= reserved
if len(candidates) > 1:
- continue
+ return
if len(candidates) == 1:
node_id = iter(candidates).next()
- node.assign_node_id(node_id)
to_provision.add(node_id)
reserved.add(node_id)
elif not candidates:
raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
node.make_filter_description())
+ finally:
+ reserve_lock.release()
+
+ if node_id is not None:
+ node.assign_node_id(node_id)
+
+ runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it
+ runner.start()
+ for guid, node in self._elements.iteritems():
+ if isinstance(node, self._node.Node) and node._node_id is None:
+ runner.put(assignifunique, guid, node)
+ runner.sync()
# Now do the backtracking search for a suitable solution
# First with existing slice nodes
reqs = []
nodes = []
+ def genreqs(node, filter_slice_id=None):
+ # Try existing nodes first
+ # If we have only one candidate, simply use it
+ candidates = node.find_candidates(
+ filter_slice_id = filter_slice_id)
+ candidates -= reserved
+ reqs.append(candidates)
+ nodes.append(node)
for guid, node in self._elements.iteritems():
if isinstance(node, self._node.Node) and node._node_id is None:
- # Try existing nodes first
- # If we have only one candidate, simply use it
- candidates = node.find_candidates(
- filter_slice_id = self.slice_id)
- candidates -= reserved
- reqs.append(candidates)
- nodes.append(node)
+ runner.put(genreqs, node, self.slice_id)
+ runner.sync()
if nodes and reqs:
if recover:
raise RuntimeError, "Impossible to recover: unassigned host for Nodes %r" % (nodes,)
+
+ def pickbest(fullset, nreq, node=nodes[0]):
+ if len(fullset) > nreq:
+ fullset = zip(node.rate_nodes(fullset),fullset)
+ fullset.sort(reverse=True)
+ del fullset[nreq:]
+ return set(map(operator.itemgetter(1),fullset))
+ else:
+ return fullset
try:
- solution = resourcealloc.alloc(reqs)
+ solution = resourcealloc.alloc(reqs, sample=pickbest)
except resourcealloc.ResourceAllocationError:
# Failed, try again with all nodes
reqs = []
for node in nodes:
- candidates = node.find_candidates()
- candidates -= reserved
- reqs.append(candidates)
-
- solution = resourcealloc.alloc(reqs)
+ runner.put(genreqs, node)
+ runner.sync()
+ solution = resourcealloc.alloc(reqs, sample=pickbest)
to_provision.update(solution)
# Do assign nodes
for node, node_id in zip(nodes, solution):
- node.assign_node_id(node_id)
+ runner.put(node.assign_node_id, node_id)
+ runner.join()
def do_provisioning(self):
if self._to_provision:
# Show the magic
self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
-
+
try:
- for guid, node in self._elements.iteritems():
- if isinstance(node, self._node.Node):
- self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
-
+ runner = ParallelRun(maxthreads=64, maxqueue=1)
+ abort = []
+ def waitforit(guid, node):
+ try:
node.wait_provisioning(
(20*60 if node._node_id in self._just_provisioned else 60)
)
# Prepare dependency installer now
node.prepare_dependencies()
+ except:
+ abort.append(None)
+ raise
+
+ for guid, node in self._elements.iteritems():
+ if abort:
+ break
+ if isinstance(node, self._node.Node):
+ self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
+ runner.put(waitforit, guid, node)
+ runner.join()
+
except self._node.UnresponsiveNodeError:
# Uh...
- self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
+ self._logger.warn("UNRESPONSIVE Nodes")
# Mark all dead nodes (which are unresponsive) on the blacklist
# and re-raise
app.node.architecture,
app.node.operatingSystem,
app.node.pl_distro,
+ app.__class__,
)
depgroups = collections.defaultdict(list)
suffix = ".pub")
# Create secure 256-bits temporary passphrase
- passphrase = ''.join(map(chr,[rng.randint(0,255)
- for rng in (random.SystemRandom(),)
- for i in xrange(32)] )).encode("hex")
+ passphrase = os.urandom(32).encode("hex")
# Copy keys
oprk = open(self.sliceSSHKey, "rb")
self._traces.clear()
def trace(self, guid, trace_id, attribute='value'):
- app = self._elements[guid]
+ elem = self._elements[guid]
if attribute == 'value':
- path = app.sync_trace(self.home_directory, trace_id)
+ path = elem.sync_trace(self.home_directory, trace_id)
if path:
fd = open(path, "r")
content = fd.read()
else:
content = None
elif attribute == 'path':
- content = app.remote_trace_path(trace_id)
+ content = elem.remote_trace_path(trace_id)
+ elif attribute == 'name':
+ content = elem.remote_trace_name(trace_id)
else:
content = None
return content
def _make_tun_filter(self, parameters):
return self._make_generic(parameters, self._interfaces.TunFilter)
+ def _make_class_queue_filter(self, parameters):
+ return self._make_generic(parameters, self._interfaces.ClassQueueFilter)
+
+ def _make_tos_queue_filter(self, parameters):
+ return self._make_generic(parameters, self._interfaces.ToSQueueFilter)
+
+ def _make_multicast_forwarder(self, parameters):
+ return self._make_generic(parameters, self._multicast.MulticastForwarder)
+
+ def _make_multicast_announcer(self, parameters):
+ return self._make_generic(parameters, self._multicast.MulticastAnnouncer)
+
+ def _make_multicast_router(self, parameters):
+ return self._make_generic(parameters, self._multicast.MulticastRouter)
+
+