from nepi.util.constants import TIME_NOW
from nepi.util.graphtools import mst
from nepi.util import ipaddr2
+import sys
import os
import os.path
import time
self._node = node
self._interfaces = interfaces
self._app = application
+
+ self._blacklist = set()
@property
def home_directory(self):
do_poststep_configure = staticmethod(do_post_asynclaunch)
def do_preconfigure(self):
- # Perform resource discovery if we don't have
- # specific resources assigned yet
- self.do_resource_discovery()
+ while True:
+ # Perform resource discovery if we don't have
+ # specific resources assigned yet
+ self.do_resource_discovery()
- # Create PlanetLab slivers
- self.do_provisioning()
+ # Create PlanetLab slivers
+ self.do_provisioning()
+
+ try:
+ # Wait for provisioning
+ self.do_wait_nodes()
+
+ # Okkey...
+ break
+ except self._node.UnresponsiveNodeError:
+ # Oh... retry...
+ pass
# Plan application deployment
self.do_spanning_deployment_plan()
def do_resource_discovery(self):
to_provision = self._to_provision = set()
+ reserved = set(self._blacklist)
+ for guid, node in self._elements.iteritems():
+ if isinstance(node, self._node.Node) and node._node_id is not None:
+ reserved.add(node._node_id)
+
# Initial algo:
# look for perfectly defined nodes
# (ie: those with only one candidate)
# If we have only one candidate, simply use it
candidates = node.find_candidates(
filter_slice_id = self.slice_id)
+ candidates -= reserved
if len(candidates) == 1:
- node.assign_node_id(iter(candidates).next())
- else:
+ node_id = iter(candidates).next()
+ node.assign_node_id(node_id)
+ reserved.add(node_id)
+ elif not candidates:
# Try again including unassigned nodes
candidates = node.find_candidates()
if len(candidates) > 1:
node_id = iter(candidates).next()
node.assign_node_id(node_id)
to_provision.add(node_id)
+ reserved.add(node_id)
elif not candidates:
raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid,
node.make_filter_description())
# If we have only one candidate, simply use it
candidates = node.find_candidates(
filter_slice_id = self.slice_id)
+ candidates -= reserved
reqs.append(candidates)
nodes.append(node)
# cleanup
del self._to_provision
+ def do_wait_nodes(self):
+ for guid, node in self._elements.iteritems():
+ if isinstance(node, self._node.Node):
+ # Just inject configuration stuff
+ node.home_path = "nepi-node-%s" % (guid,)
+ node.ident_path = self.sliceSSHKey
+ node.slicename = self.slicename
+
+ # Show the magic
+ print "PlanetLab Node", guid, "configured at", node.hostname
+
+ try:
+ for guid, node in self._elements.iteritems():
+ if isinstance(node, self._node.Node):
+ print "Waiting for Node", guid, "configured at", node.hostname,
+ sys.stdout.flush()
+
+ node.wait_provisioning()
+
+ print "READY"
+ except self._node.UnresponsiveNodeError:
+ # Uh...
+ print "UNRESPONSIVE"
+
+ # Mark all dead nodes (which are unresponsive) on the blacklist
+ # and re-raise
+ for guid, node in self._elements.iteritems():
+ if isinstance(node, self._node.Node):
+ if not node.is_alive():
+ print "Blacklisting", node.hostname, "for unresponsiveness"
+ self._blacklist.add(node._node_id)
+ node.unassign_node()
+ raise
+
def do_spanning_deployment_plan(self):
# Create application groups by collecting all applications
# based on their hash - the hash should contain everything that
# Do some validations
node.validate()
- # recently provisioned nodes may not be up yet
- sleeptime = 1.0
- while not node.is_alive():
- time.sleep(sleeptime)
- sleeptime = min(30.0, sleeptime*1.5)
-
# this will be done in parallel in all nodes
# this call only spawns the process
node.install_dependencies()
import collections
import cStringIO
import resourcealloc
+import socket
+import sys
from nepi.util import server
+from nepi.util import parallel
+
+class UnresponsiveNodeError(RuntimeError):
+ pass
class Node(object):
BASEFILTERS = {
)
def find_candidates(self, filter_slice_id=None):
+ print >>sys.stderr, "Finding candidates for", self.make_filter_description()
+
fields = ('node_id',)
replacements = {'timeframe':self.timeframe}
# only pick healthy nodes
basefilters['run_level'] = 'boot'
basefilters['boot_state'] = 'boot'
+ basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now)
+ basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies
# keyword-only "pseudofilters"
extra = {}
len(ifaces.get(node_id,())) <= self.max_num_external_ifaces )
candidates = set(filter(predicate, candidates))
+
+ # make sure hostnames are resolvable
+ if candidates:
+ print >>sys.stderr, " Found", len(candidates), "candidates. Checking for reachability..."
+
+ hostnames = dict(map(operator.itemgetter('node_id','hostname'),
+ self._api.GetNodes(list(candidates), ['node_id','hostname'])
+ ))
+ def resolvable(node_id):
+ try:
+ addr = socket.gethostbyname(hostnames[node_id])
+ return addr is not None
+ except:
+ return False
+ candidates = set(parallel.pfilter(resolvable, candidates,
+ maxthreads = 16))
+
+ print >>sys.stderr, " Found", len(candidates), "reachable candidates."
return candidates
self._node_id = node_id
self.fetch_node_info()
+ def unassign_node(self):
+ self._node_id = None
+ self.__dict__.update(self.__orig_attrs)
+
def fetch_node_info(self):
+ orig_attrs = {}
+
info = self._api.GetNodes(self._node_id)[0]
tags = dict( (t['tagname'],t['value'])
for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) )
+ orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces
+ orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces
self.min_num_external_ifaces = None
self.max_num_external_ifaces = None
self.timeframe = 'm'
for attr, tag in self.BASEFILTERS.iteritems():
if tag in info:
value = info[tag]
+ if hasattr(self, attr):
+ orig_attrs[attr] = getattr(self, attr)
setattr(self, attr, value)
for attr, (tag,_) in self.TAGFILTERS.iteritems():
tag = tag % replacements
if tag in tags:
value = tags[tag]
+ if hasattr(self, attr):
+ orig_attrs[attr] = getattr(self, attr)
setattr(self, attr, value)
if 'peer_id' in info:
+ orig_attrs['site'] = self.site
self.site = self._api.peer_map[info['peer_id']]
if 'interface_ids' in info:
self.max_num_external_ifaces = len(info['interface_ids'])
if 'ssh_rsa_key' in info:
+ orig_attrs['server_key'] = self.server_key
self.server_key = info['ssh_rsa_key']
+
+ self.__orig_attrs = orig_attrs
def validate(self):
if self.home_path is None:
if proc.wait():
raise RuntimeError, "Failed to set up application: %s %s" % (out,err,)
+ def wait_provisioning(self):
+ # recently provisioned nodes may not be up yet
+ sleeptime = 1.0
+ totaltime = 0.0
+ while not self.is_alive():
+ time.sleep(sleeptime)
+ totaltime += sleeptime
+ sleeptime = min(30.0, sleeptime*1.5)
+
+ if totaltime > 20*60:
+ # PlanetLab has a 15' delay on configuration propagation
+ # If we're above that delay, the unresponsiveness is not due
+ # to this delay.
+ raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,)
+
def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10):
if self.required_packages:
pidfile = self.DEPENDS_PIDFILE
raise StopIteration
+class ParallelFilter(ParallelMap):
+ class _FILTERED:
+ pass
+ def __filter(self, x):
+ if self.filter_condition(x):
+ return x
+ else:
+ return self._FILTERED
+
+ def __init__(self, filter_condition, maxthreads = None, maxqueue = None):
+ super(ParallelFilter, self).__init__(maxthreads, maxqueue, True)
+ self.filter_condition = filter_condition
+
+ def put(self, what):
+ super(ParallelFilter, self).put(self.__filter, what)
+
+ def put_nowait(self, what):
+ super(ParallelFilter, self).put_nowait(self.__filter, what)
+
+ def __iter__(self):
+ for rv in super(ParallelFilter, self).__iter__():
+ if rv is not self._FILTERED:
+ yield rv
+
+
+def pmap(mapping, iterable, maxthreads = None, maxqueue = None):
+ mapper = ParallelMap(
+ maxthreads = maxthreads,
+ maxqueue = maxqueue,
+ results = True)
+ mapper.start()
+ for elem in iterable:
+ mapper.put(elem)
+ return list(mapper)
+
+def pfilter(condition, iterable, maxthreads = None, maxqueue = None):
+ filtrer = ParallelFilter(
+ condition,
+ maxthreads = maxthreads,
+ maxqueue = maxqueue)
+ filtrer.start()
+ for elem in iterable:
+ filtrer.put(elem)
+ return list(filtrer)
+