X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Ftestbeds%2Fplanetlab%2Fnode.py;h=59485ef2cc0e7127d19833913d4fe807cfced220;hb=62aa9b7a5b846baed4dd304e2d8fa442f9f35b51;hp=c357462574ab905fa95f5838fc908d6f2f65b69e;hpb=4ccc2f121afe8dddb36778f31c518167d76bebe2;p=nepi.git diff --git a/src/nepi/testbeds/planetlab/node.py b/src/nepi/testbeds/planetlab/node.py index c3574625..59485ef2 100644 --- a/src/nepi/testbeds/planetlab/node.py +++ b/src/nepi/testbeds/planetlab/node.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # -*- coding: utf-8 -*- from constants import TESTBED_ID @@ -15,6 +14,7 @@ import sys import logging import ipaddr import operator +import re from nepi.util import server from nepi.util import parallel @@ -64,7 +64,15 @@ class Node(object): 'maxLoad' : ('load%(timeframe)s', '[value'), 'minCpu' : ('cpu%(timeframe)s', ']value'), 'maxCpu' : ('cpu%(timeframe)s', '[value'), - } + } + + RATE_FACTORS = ( + # (, , ) + ('bw%(timeframe)s', -0.001, 1024.0), + ('cpu%(timeframe)s', 0.1, 40.0), + ('load%(timeframe)s', -0.2, 3.0), + ('reliability%(timeframe)s', 1, 100.0), + ) DEPENDS_PIDFILE = '/tmp/nepi-depends.pid' DEPENDS_LOGFILE = '/tmp/nepi-depends.log' @@ -80,10 +88,11 @@ class Node(object): minLoad = _castproperty(float, '_minLoad') maxLoad = _castproperty(float, '_maxLoad') - def __init__(self, api=None): + def __init__(self, api=None, sliceapi=None): if not api: api = plcapi.PLCAPI() self._api = api + self._sliceapi = sliceapi or api # Attributes self.hostname = None @@ -104,7 +113,7 @@ class Node(object): self.maxLoad = None self.min_num_external_ifaces = None self.max_num_external_ifaces = None - self.timeframe = 'm' + self._timeframe = 'w' # Applications and routes add requirements to connected nodes self.required_packages = set() @@ -113,12 +122,16 @@ class Node(object): self.rpmFusion = False self.env = collections.defaultdict(list) + # Some special applications - initialized when connected + self.multicast_forwarder = None + # Testbed-derived attributes self.slicename = None self.ident_path = None self.server_key = None self.home_path = None - self.enable_cleanup = False + self.enable_proc_cleanup = False + self.enable_home_cleanup = False # Those are filled when an actual node is allocated self._node_id = None @@ -127,6 +140,27 @@ class Node(object): # Logging self._logger = logging.getLogger('nepi.testbeds.planetlab') + + def set_timeframe(self, timeframe): + if timeframe == "latest": + self._timeframe = "" + elif timeframe == "month": + self._timeframe = "m" + elif timeframe == "year": + self._timeframe = "y" + else: + self._timeframe = "w" + + def get_timeframe(self): + if self._timeframe == "": + return "latest" + if self._timeframe == "m": + return "month" + if self._timeframe == "y": + return "year" + return "week" + + timeframe = property(get_timeframe, set_timeframe) def _nepi_testbed_environment_setup_get(self): command = cStringIO.StringIO() @@ -141,8 +175,10 @@ class Node(object): for envval in envvals: command.write(' ; export %s=%s' % (envkey, envval)) return command.getvalue() + def _nepi_testbed_environment_setup_set(self, value): pass + _nepi_testbed_environment_setup = property( _nepi_testbed_environment_setup_get, _nepi_testbed_environment_setup_set) @@ -166,7 +202,7 @@ class Node(object): self._logger.info("Finding candidates for %s", self.make_filter_description()) fields = ('node_id',) - replacements = {'timeframe':self.timeframe} + replacements = {'timeframe':self._timeframe} # get initial candidates (no tag filters) basefilters = self.build_filters({}, self.BASEFILTERS) @@ -186,8 +222,8 @@ class Node(object): extra['peer'] = self.site candidates = set(map(operator.itemgetter('node_id'), - self._api.GetNodes(filters=basefilters, fields=fields, **extra))) - + self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra))) + # filter by tag, one tag at a time applicable = self.applicable_filters for tagfilter in self.TAGFILTERS.iteritems(): @@ -197,22 +233,22 @@ class Node(object): if attr in applicable: tagfilter = rootfilters.copy() tagfilter['tagname'] = tagname % replacements - tagfilter[expr % replacements] = getattr(self,attr) + tagfilter[expr % replacements] = str(getattr(self,attr)) tagfilter['node_id'] = list(candidates) - + candidates &= set(map(operator.itemgetter('node_id'), - self._api.GetNodeTags(filters=tagfilter, fields=fields))) - + self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields))) + # filter by vsys tags - special case since it doesn't follow # the usual semantics if self.required_vsys: newcandidates = collections.defaultdict(set) - vsys_tags = self._api.GetNodeTags( + vsys_tags = self._sliceapi.GetNodeTags( tagname='vsys', node_id = list(candidates), fields = ['node_id','value']) - + vsys_tags = map( operator.itemgetter(['node_id','value']), vsys_tags) @@ -234,7 +270,7 @@ class Node(object): filters = basefilters.copy() filters['node_id'] = list(candidates) ifaces = dict(map(operator.itemgetter('node_id','interface_ids'), - self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) )) + self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) )) # filter candidates by interface count if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None: @@ -248,14 +284,16 @@ class Node(object): len(ifaces.get(node_id,())) <= self.max_num_external_ifaces ) candidates = set(filter(predicate, candidates)) - + # make sure hostnames are resolvable + hostnames = dict() if candidates: self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates)) - + hostnames = dict(map(operator.itemgetter('node_id','hostname'), - self._api.GetNodes(list(candidates), ['node_id','hostname']) + self._sliceapi.GetNodes(list(candidates), ['node_id','hostname']) )) + def resolvable(node_id): try: addr = socket.gethostbyname(hostnames[node_id]) @@ -266,8 +304,14 @@ class Node(object): maxthreads = 16)) self._logger.info(" Found %s reachable candidates.", len(candidates)) - - return candidates + + for h in hostnames.keys(): + if h not in candidates: + del hostnames[h] + + hostnames = dict((v,k) for k, v in hostnames.iteritems()) + + return hostnames def make_filter_description(self): """ @@ -311,22 +355,59 @@ class Node(object): def unassign_node(self): self._node_id = None - self.__dict__.update(self.__orig_attrs) + self.hostip = None + + try: + orig_attrs = self.__orig_attrs + except AttributeError: + return + + for key, value in orig_attrs.iteritems(): + setattr(self, key, value) + del self.__orig_attrs + def rate_nodes(self, nodes): + rates = collections.defaultdict(int) + tags = collections.defaultdict(dict) + replacements = {'timeframe':self._timeframe} + tagnames = [ tagname % replacements + for tagname, weight, default in self.RATE_FACTORS ] + + taginfo = self._sliceapi.GetNodeTags( + node_id=list(nodes), + tagname=tagnames, + fields=('node_id','tagname','value')) + + unpack = operator.itemgetter('node_id','tagname','value') + for value in taginfo: + node, tagname, value = unpack(value) + if value and value.lower() != 'n/a': + tags[tagname][node] = float(value) + + for tagname, weight, default in self.RATE_FACTORS: + taginfo = tags[tagname % replacements].get + for node in nodes: + rates[node] += weight * taginfo(node,default) + + return map(rates.__getitem__, nodes) + def fetch_node_info(self): orig_attrs = {} - info = self._api.GetNodes(self._node_id)[0] + info, tags = self._sliceapi.GetNodeInfo(self._node_id) + info = info[0] + tags = dict( (t['tagname'],t['value']) - for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) ) + for t in tags ) orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces self.min_num_external_ifaces = None self.max_num_external_ifaces = None - self.timeframe = 'm' + if not self._timeframe: self._timeframe = 'w' - replacements = {'timeframe':self.timeframe} + replacements = {'timeframe':self._timeframe} + for attr, tag in self.BASEFILTERS.iteritems(): if tag in info: value = info[tag] @@ -339,11 +420,13 @@ class Node(object): value = tags[tag] if hasattr(self, attr): orig_attrs[attr] = getattr(self, attr) + if not value or value.lower() == 'n/a': + value = None setattr(self, attr, value) if 'peer_id' in info: orig_attrs['site'] = self.site - self.site = self._api.peer_map[info['peer_id']] + self.site = self._sliceapi.peer_map[info['peer_id']] if 'interface_ids' in info: self.min_num_external_ifaces = \ @@ -353,7 +436,12 @@ class Node(object): orig_attrs['server_key'] = self.server_key self.server_key = info['ssh_rsa_key'] - self.__orig_attrs = orig_attrs + self.hostip = socket.gethostbyname(self.hostname) + + try: + self.__orig_attrs + except AttributeError: + self.__orig_attrs = orig_attrs def validate(self): if self.home_path is None: @@ -389,7 +477,7 @@ class Node(object): RPM_FUSION_URL = self.RPM_FUSION_URL rpmFusion = ( - '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&' + 'rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || sudo -S rpm -i %(RPM_FUSION_URL)s' ) % { 'RPM_FUSION_URL' : RPM_FUSION_URL } @@ -404,10 +492,13 @@ class Node(object): user = self.slicename, agent = None, ident_key = self.ident_path, - server_key = self.server_key + server_key = self.server_key, + timeout = 600, ) if proc.wait(): + if self.check_bad_host(out,err): + self.blacklist() raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) # Launch p2p yum dependency installer @@ -426,19 +517,22 @@ class Node(object): # PlanetLab has a 15' delay on configuration propagation # If we're above that delay, the unresponsiveness is not due # to this delay. - raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,) + if not self.is_alive(verbose=True): + raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,) # Ensure the node is clean (no apps running that could interfere with operations) - if self.enable_cleanup: - self.do_cleanup() - + if self.enable_proc_cleanup: + self.do_proc_cleanup() + if self.enable_home_cleanup: + self.do_home_cleanup() + def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10): # Wait for the p2p installer if self._yum_dependencies and not self._installed: self._yum_dependencies.async_setup_wait() self._installed = True - def is_alive(self): + def is_alive(self, verbose = False): # Make sure all the paths are created where # they have to be created for deployment (out,err),proc = server.eintr_retry(server.popen_ssh_command)( @@ -448,26 +542,39 @@ class Node(object): user = self.slicename, agent = None, ident_key = self.ident_path, - server_key = self.server_key + server_key = self.server_key, + timeout = 60, + err_on_timeout = False, + persistent = False ) if proc.wait(): + if verbose: + self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err) return False elif not err and out.strip() == 'ALIVE': return True else: + if verbose: + self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err) return False def destroy(self): - if self.enable_cleanup: - self.do_cleanup() + if self.enable_proc_cleanup: + self.do_proc_cleanup() - def do_cleanup(self): + def blacklist(self): + if self._node_id: + self._logger.warn("Blacklisting malfunctioning node %s", self.hostname) + import util + util.appendBlacklist(self.hostname) + + def do_proc_cleanup(self): if self.testbed().recovering: # WOW - not now return - self._logger.info("Cleaning up %s", self.hostname) + self._logger.info("Cleaning up processes on %s", self.hostname) cmds = [ "sudo -S killall python tcpdump || /bin/true ; " @@ -492,9 +599,40 @@ class Node(object): ident_key = self.ident_path, server_key = self.server_key, tty = True, # so that ps -N -T works as advertised... + timeout = 60, + retry = 3 ) proc.wait() - + + def do_home_cleanup(self): + if self.testbed().recovering: + # WOW - not now + return + + self._logger.info("Cleaning up home on %s", self.hostname) + + cmds = [ + "find . -maxdepth 1 ! -name '.bash*' ! -name '.' -execdir rm -rf {} + " + ] + + for cmd in cmds: + (out,err),proc = server.popen_ssh_command( + # Some apps need two kills + cmd % { + 'slicename' : self.slicename , + }, + host = self.hostname, + port = None, + user = self.slicename, + agent = None, + ident_key = self.ident_path, + server_key = self.server_key, + tty = True, # so that ps -N -T works as advertised... + timeout = 60, + retry = 3 + ) + proc.wait() + def prepare_dependencies(self): # Configure p2p yum dependency installer if self.required_packages and not self._installed: @@ -530,10 +668,10 @@ class Node(object): if len(routes) > MAX_VROUTE_ROUTES: return 'sliceip' - vsys_vnet = ipaddr.IPNetwork(vsys_vnet) + vsys_vnet = ipaddr.IPv4Network(vsys_vnet) for route in routes: - dest, prefix, nexthop, metric = route - dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix)) + dest, prefix, nexthop, metric, device = route + dest = ipaddr.IPv4Network("%s/%d" % (dest,prefix)) nexthop = ipaddr.IPAddress(nexthop) if dest not in vsys_vnet or nexthop not in vsys_vnet: return 'sliceip' @@ -541,7 +679,7 @@ class Node(object): return 'vroute' def format_route(self, route, dev, method, action): - dest, prefix, nexthop, metric = route + dest, prefix, nexthop, metric, device = route if method == 'vroute': return ( "%s %s%s gw %s %s" % ( @@ -670,7 +808,8 @@ class Node(object): agent = None, ident_key = self.ident_path, server_key = self.server_key, - stdin = '\n'.join(rules) + stdin = '\n'.join(rules), + timeout = 300 ) if proc.wait() or err: @@ -678,3 +817,10 @@ class Node(object): elif out or err: logger.debug("%s said: %s%s", method, out, err) + def check_bad_host(self, out, err): + badre = re.compile(r'(?:' + r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'" + r'|Error: disk I/O error' + r')', + re.I) + return badre.search(out) or badre.search(err)