from nepi.util import ipaddr2
from nepi.util import environ
from nepi.util.parallel import ParallelRun
+import threading
import sys
import os
import os.path
# Initial algo:
# look for perfectly defined nodes
# (ie: those with only one candidate)
- for guid, node in self._elements.iteritems():
- if isinstance(node, self._node.Node) and node._node_id is None:
- # Try existing nodes first
- # If we have only one candidate, simply use it
- candidates = node.find_candidates(
- filter_slice_id = self.slice_id)
+ reserve_lock = threading.Lock()
+ def assignifunique(guid, node):
+ # Try existing nodes first
+ # If we have only one candidate, simply use it
+ candidates = node.find_candidates(
+ filter_slice_id = self.slice_id)
+
+ node_id = None
+ reserve_lock.acquire()
+ try:
candidates -= reserved
if len(candidates) == 1:
node_id = iter(candidates).next()
- node.assign_node_id(node_id)
reserved.add(node_id)
elif not candidates:
# Try again including unassigned nodes
- candidates = node.find_candidates()
+ reserve_lock.release()
+ try:
+ candidates = node.find_candidates()
+ finally:
+ reserve_lock.acquire()
candidates -= reserved
if len(candidates) > 1:
- continue
+ return
if len(candidates) == 1:
node_id = iter(candidates).next()
- node.assign_node_id(node_id)
to_provision.add(node_id)
reserved.add(node_id)
elif not candidates:
raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
node.make_filter_description())
+ finally:
+ reserve_lock.release()
+
+ if node_id is not None:
+ node.assign_node_id(node_id)
+
+ runner = ParallelRun(maxthreads=4) # don't overload the PLC API, just 4 threads to hide latencies and that's it
+ runner.start()
+ for guid, node in self._elements.iteritems():
+ if isinstance(node, self._node.Node) and node._node_id is None:
+ runner.put(assignifunique, guid, node)
+ runner.sync()
# Now do the backtracking search for a suitable solution
# First with existing slice nodes
reqs = []
nodes = []
+ def genreqs(node, filter_slice_id=None):
+ # Try existing nodes first
+ # If we have only one candidate, simply use it
+ candidates = node.find_candidates(
+ filter_slice_id = filter_slice_id)
+ candidates -= reserved
+ reqs.append(candidates)
+ nodes.append(node)
for guid, node in self._elements.iteritems():
if isinstance(node, self._node.Node) and node._node_id is None:
- # Try existing nodes first
- # If we have only one candidate, simply use it
- candidates = node.find_candidates(
- filter_slice_id = self.slice_id)
- candidates -= reserved
- reqs.append(candidates)
- nodes.append(node)
+ runner.put(genreqs, node, self.slice_id)
+ runner.sync()
if nodes and reqs:
if recover:
# Failed, try again with all nodes
reqs = []
for node in nodes:
- candidates = node.find_candidates()
- candidates -= reserved
- reqs.append(candidates)
-
+ runner.put(genreqs, node)
+ runner.sync()
solution = resourcealloc.alloc(reqs, sample=pickbest)
to_provision.update(solution)
# Do assign nodes
for node, node_id in zip(nodes, solution):
- node.assign_node_id(node_id)
+ runner.put(node.assign_node_id, node_id)
+ runner.join()
def do_provisioning(self):
if self._to_provision:
# Show the magic
self._logger.info("PlanetLab Node %s configured at %s", guid, node.hostname)
-
+
try:
- for guid, node in self._elements.iteritems():
- if isinstance(node, self._node.Node):
- self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
-
+ runner = ParallelRun(maxqueue=1)
+ abort = []
+ def waitforit(guid, node):
+ try:
node.wait_provisioning(
(20*60 if node._node_id in self._just_provisioned else 60)
)
# Prepare dependency installer now
node.prepare_dependencies()
+ except:
+ abort.append(None)
+ raise
+
+ for guid, node in self._elements.iteritems():
+ if abort:
+ break
+ if isinstance(node, self._node.Node):
+ self._logger.info("Waiting for Node %s configured at %s", guid, node.hostname)
+ runner.put(waitforit, guid, node)
+ runner.join()
+
except self._node.UnresponsiveNodeError:
# Uh...
- self._logger.warn("UNRESPONSIVE Node %s", node.hostname)
+ self._logger.warn("UNRESPONSIVE Nodes")
# Mark all dead nodes (which are unresponsive) on the blacklist
# and re-raise