From 7306c27230a7652cb6d7e4fb59dcc855740874ac Mon Sep 17 00:00:00 2001 From: Claudio-Daniel Freire Date: Wed, 22 Jun 2011 17:14:31 +0200 Subject: [PATCH] Blacklist nodes that are not so healthy --- src/nepi/testbeds/planetlab/execute.py | 72 +++++++++++++++++++-- src/nepi/testbeds/planetlab/metadata_v01.py | 6 -- src/nepi/testbeds/planetlab/node.py | 59 +++++++++++++++++ src/nepi/util/parallel.py | 45 +++++++++++++ 4 files changed, 169 insertions(+), 13 deletions(-) diff --git a/src/nepi/testbeds/planetlab/execute.py b/src/nepi/testbeds/planetlab/execute.py index 8813ae49..3a1b65ab 100644 --- a/src/nepi/testbeds/planetlab/execute.py +++ b/src/nepi/testbeds/planetlab/execute.py @@ -6,6 +6,7 @@ from nepi.core import testbed_impl from nepi.util.constants import TIME_NOW from nepi.util.graphtools import mst from nepi.util import ipaddr2 +import sys import os import os.path import time @@ -36,6 +37,8 @@ class TestbedController(testbed_impl.TestbedController): self._node = node self._interfaces = interfaces self._app = application + + self._blacklist = set() @property def home_directory(self): @@ -99,12 +102,23 @@ class TestbedController(testbed_impl.TestbedController): do_poststep_configure = staticmethod(do_post_asynclaunch) def do_preconfigure(self): - # Perform resource discovery if we don't have - # specific resources assigned yet - self.do_resource_discovery() + while True: + # Perform resource discovery if we don't have + # specific resources assigned yet + self.do_resource_discovery() - # Create PlanetLab slivers - self.do_provisioning() + # Create PlanetLab slivers + self.do_provisioning() + + try: + # Wait for provisioning + self.do_wait_nodes() + + # Okkey... + break + except self._node.UnresponsiveNodeError: + # Oh... retry... + pass # Plan application deployment self.do_spanning_deployment_plan() @@ -115,6 +129,11 @@ class TestbedController(testbed_impl.TestbedController): def do_resource_discovery(self): to_provision = self._to_provision = set() + reserved = set(self._blacklist) + for guid, node in self._elements.iteritems(): + if isinstance(node, self._node.Node) and node._node_id is not None: + reserved.add(node._node_id) + # Initial algo: # look for perfectly defined nodes # (ie: those with only one candidate) @@ -124,9 +143,12 @@ class TestbedController(testbed_impl.TestbedController): # If we have only one candidate, simply use it candidates = node.find_candidates( filter_slice_id = self.slice_id) + candidates -= reserved if len(candidates) == 1: - node.assign_node_id(iter(candidates).next()) - else: + node_id = iter(candidates).next() + node.assign_node_id(node_id) + reserved.add(node_id) + elif not candidates: # Try again including unassigned nodes candidates = node.find_candidates() if len(candidates) > 1: @@ -135,6 +157,7 @@ class TestbedController(testbed_impl.TestbedController): node_id = iter(candidates).next() node.assign_node_id(node_id) to_provision.add(node_id) + reserved.add(node_id) elif not candidates: raise RuntimeError, "Cannot assign resources for node %s, no candidates sith %s" % (guid, node.make_filter_description()) @@ -149,6 +172,7 @@ class TestbedController(testbed_impl.TestbedController): # If we have only one candidate, simply use it candidates = node.find_candidates( filter_slice_id = self.slice_id) + candidates -= reserved reqs.append(candidates) nodes.append(node) @@ -179,6 +203,40 @@ class TestbedController(testbed_impl.TestbedController): # cleanup del self._to_provision + def do_wait_nodes(self): + for guid, node in self._elements.iteritems(): + if isinstance(node, self._node.Node): + # Just inject configuration stuff + node.home_path = "nepi-node-%s" % (guid,) + node.ident_path = self.sliceSSHKey + node.slicename = self.slicename + + # Show the magic + print "PlanetLab Node", guid, "configured at", node.hostname + + try: + for guid, node in self._elements.iteritems(): + if isinstance(node, self._node.Node): + print "Waiting for Node", guid, "configured at", node.hostname, + sys.stdout.flush() + + node.wait_provisioning() + + print "READY" + except self._node.UnresponsiveNodeError: + # Uh... + print "UNRESPONSIVE" + + # Mark all dead nodes (which are unresponsive) on the blacklist + # and re-raise + for guid, node in self._elements.iteritems(): + if isinstance(node, self._node.Node): + if not node.is_alive(): + print "Blacklisting", node.hostname, "for unresponsiveness" + self._blacklist.add(node._node_id) + node.unassign_node() + raise + def do_spanning_deployment_plan(self): # Create application groups by collecting all applications # based on their hash - the hash should contain everything that diff --git a/src/nepi/testbeds/planetlab/metadata_v01.py b/src/nepi/testbeds/planetlab/metadata_v01.py index 85ae0993..8ca0e993 100644 --- a/src/nepi/testbeds/planetlab/metadata_v01.py +++ b/src/nepi/testbeds/planetlab/metadata_v01.py @@ -377,12 +377,6 @@ def configure_node(testbed_instance, guid): # Do some validations node.validate() - # recently provisioned nodes may not be up yet - sleeptime = 1.0 - while not node.is_alive(): - time.sleep(sleeptime) - sleeptime = min(30.0, sleeptime*1.5) - # this will be done in parallel in all nodes # this call only spawns the process node.install_dependencies() diff --git a/src/nepi/testbeds/planetlab/node.py b/src/nepi/testbeds/planetlab/node.py index 8557a95e..9e56c32f 100644 --- a/src/nepi/testbeds/planetlab/node.py +++ b/src/nepi/testbeds/planetlab/node.py @@ -10,8 +10,14 @@ import os import collections import cStringIO import resourcealloc +import socket +import sys from nepi.util import server +from nepi.util import parallel + +class UnresponsiveNodeError(RuntimeError): + pass class Node(object): BASEFILTERS = { @@ -101,6 +107,8 @@ class Node(object): ) def find_candidates(self, filter_slice_id=None): + print >>sys.stderr, "Finding candidates for", self.make_filter_description() + fields = ('node_id',) replacements = {'timeframe':self.timeframe} @@ -113,6 +121,8 @@ class Node(object): # only pick healthy nodes basefilters['run_level'] = 'boot' basefilters['boot_state'] = 'boot' + basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now) + basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies # keyword-only "pseudofilters" extra = {} @@ -182,6 +192,24 @@ class Node(object): len(ifaces.get(node_id,())) <= self.max_num_external_ifaces ) candidates = set(filter(predicate, candidates)) + + # make sure hostnames are resolvable + if candidates: + print >>sys.stderr, " Found", len(candidates), "candidates. Checking for reachability..." + + hostnames = dict(map(operator.itemgetter('node_id','hostname'), + self._api.GetNodes(list(candidates), ['node_id','hostname']) + )) + def resolvable(node_id): + try: + addr = socket.gethostbyname(hostnames[node_id]) + return addr is not None + except: + return False + candidates = set(parallel.pfilter(resolvable, candidates, + maxthreads = 16)) + + print >>sys.stderr, " Found", len(candidates), "reachable candidates." return candidates @@ -225,11 +253,19 @@ class Node(object): self._node_id = node_id self.fetch_node_info() + def unassign_node(self): + self._node_id = None + self.__dict__.update(self.__orig_attrs) + def fetch_node_info(self): + orig_attrs = {} + info = self._api.GetNodes(self._node_id)[0] tags = dict( (t['tagname'],t['value']) for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) ) + orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces + orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces self.min_num_external_ifaces = None self.max_num_external_ifaces = None self.timeframe = 'm' @@ -238,14 +274,19 @@ class Node(object): for attr, tag in self.BASEFILTERS.iteritems(): if tag in info: value = info[tag] + if hasattr(self, attr): + orig_attrs[attr] = getattr(self, attr) setattr(self, attr, value) for attr, (tag,_) in self.TAGFILTERS.iteritems(): tag = tag % replacements if tag in tags: value = tags[tag] + if hasattr(self, attr): + orig_attrs[attr] = getattr(self, attr) setattr(self, attr, value) if 'peer_id' in info: + orig_attrs['site'] = self.site self.site = self._api.peer_map[info['peer_id']] if 'interface_ids' in info: @@ -253,7 +294,10 @@ class Node(object): self.max_num_external_ifaces = len(info['interface_ids']) if 'ssh_rsa_key' in info: + orig_attrs['server_key'] = self.server_key self.server_key = info['ssh_rsa_key'] + + self.__orig_attrs = orig_attrs def validate(self): if self.home_path is None: @@ -291,6 +335,21 @@ class Node(object): if proc.wait(): raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) + def wait_provisioning(self): + # recently provisioned nodes may not be up yet + sleeptime = 1.0 + totaltime = 0.0 + while not self.is_alive(): + time.sleep(sleeptime) + totaltime += sleeptime + sleeptime = min(30.0, sleeptime*1.5) + + if totaltime > 20*60: + # PlanetLab has a 15' delay on configuration propagation + # If we're above that delay, the unresponsiveness is not due + # to this delay. + raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,) + def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10): if self.required_packages: pidfile = self.DEPENDS_PIDFILE diff --git a/src/nepi/util/parallel.py b/src/nepi/util/parallel.py index d1169a55..abf22021 100644 --- a/src/nepi/util/parallel.py +++ b/src/nepi/util/parallel.py @@ -88,4 +88,49 @@ class ParallelMap(object): raise StopIteration +class ParallelFilter(ParallelMap): + class _FILTERED: + pass + def __filter(self, x): + if self.filter_condition(x): + return x + else: + return self._FILTERED + + def __init__(self, filter_condition, maxthreads = None, maxqueue = None): + super(ParallelFilter, self).__init__(maxthreads, maxqueue, True) + self.filter_condition = filter_condition + + def put(self, what): + super(ParallelFilter, self).put(self.__filter, what) + + def put_nowait(self, what): + super(ParallelFilter, self).put_nowait(self.__filter, what) + + def __iter__(self): + for rv in super(ParallelFilter, self).__iter__(): + if rv is not self._FILTERED: + yield rv + + +def pmap(mapping, iterable, maxthreads = None, maxqueue = None): + mapper = ParallelMap( + maxthreads = maxthreads, + maxqueue = maxqueue, + results = True) + mapper.start() + for elem in iterable: + mapper.put(elem) + return list(mapper) + +def pfilter(condition, iterable, maxthreads = None, maxqueue = None): + filtrer = ParallelFilter( + condition, + maxthreads = maxthreads, + maxqueue = maxqueue) + filtrer.start() + for elem in iterable: + filtrer.put(elem) + return list(filtrer) + -- 2.47.0