X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Ftestbeds%2Fplanetlab%2Fnode.py;h=a924e12526c34bfb007bb9152bbd390140a178d9;hb=70c33a3b0fe396cce12d3d1c0eb6cf40ce537a95;hp=e6a20b5cf79de7ab30e129b7fe5cf69d572bfa32;hpb=f474019d633e83dbe4554be579e56f27d863f307;p=nepi.git diff --git a/src/nepi/testbeds/planetlab/node.py b/src/nepi/testbeds/planetlab/node.py index e6a20b5c..a924e125 100644 --- a/src/nepi/testbeds/planetlab/node.py +++ b/src/nepi/testbeds/planetlab/node.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # -*- coding: utf-8 -*- from constants import TESTBED_ID @@ -15,6 +14,7 @@ import sys import logging import ipaddr import operator +import re from nepi.util import server from nepi.util import parallel @@ -76,6 +76,7 @@ class Node(object): DEPENDS_PIDFILE = '/tmp/nepi-depends.pid' DEPENDS_LOGFILE = '/tmp/nepi-depends.log' + RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm' RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm' @@ -88,10 +89,11 @@ class Node(object): minLoad = _castproperty(float, '_minLoad') maxLoad = _castproperty(float, '_maxLoad') - def __init__(self, api=None): + def __init__(self, api=None, sliceapi=None): if not api: api = plcapi.PLCAPI() self._api = api + self._sliceapi = sliceapi or api # Attributes self.hostname = None @@ -112,7 +114,7 @@ class Node(object): self.maxLoad = None self.min_num_external_ifaces = None self.max_num_external_ifaces = None - self.timeframe = 'm' + self._timeframe = 'w' # Applications and routes add requirements to connected nodes self.required_packages = set() @@ -129,7 +131,8 @@ class Node(object): self.ident_path = None self.server_key = None self.home_path = None - self.enable_cleanup = False + self.enable_proc_cleanup = False + self.enable_home_cleanup = False # Those are filled when an actual node is allocated self._node_id = None @@ -138,6 +141,27 @@ class Node(object): # Logging self._logger = logging.getLogger('nepi.testbeds.planetlab') + + def set_timeframe(self, timeframe): + if timeframe == "latest": + self._timeframe = "" + elif timeframe == "month": + self._timeframe = "m" + elif timeframe == "year": + self._timeframe = "y" + else: + self._timeframe = "w" + + def get_timeframe(self): + if self._timeframe == "": + return "latest" + if self._timeframe == "m": + return "month" + if self._timeframe == "y": + return "year" + return "week" + + timeframe = property(get_timeframe, set_timeframe) def _nepi_testbed_environment_setup_get(self): command = cStringIO.StringIO() @@ -152,8 +176,10 @@ class Node(object): for envval in envvals: command.write(' ; export %s=%s' % (envkey, envval)) return command.getvalue() + def _nepi_testbed_environment_setup_set(self, value): pass + _nepi_testbed_environment_setup = property( _nepi_testbed_environment_setup_get, _nepi_testbed_environment_setup_set) @@ -177,7 +203,7 @@ class Node(object): self._logger.info("Finding candidates for %s", self.make_filter_description()) fields = ('node_id',) - replacements = {'timeframe':self.timeframe} + replacements = {'timeframe':self._timeframe} # get initial candidates (no tag filters) basefilters = self.build_filters({}, self.BASEFILTERS) @@ -197,8 +223,8 @@ class Node(object): extra['peer'] = self.site candidates = set(map(operator.itemgetter('node_id'), - self._api.GetNodes(filters=basefilters, fields=fields, **extra))) - + self._sliceapi.GetNodes(filters=basefilters, fields=fields, **extra))) + # filter by tag, one tag at a time applicable = self.applicable_filters for tagfilter in self.TAGFILTERS.iteritems(): @@ -208,22 +234,22 @@ class Node(object): if attr in applicable: tagfilter = rootfilters.copy() tagfilter['tagname'] = tagname % replacements - tagfilter[expr % replacements] = getattr(self,attr) + tagfilter[expr % replacements] = str(getattr(self,attr)) tagfilter['node_id'] = list(candidates) - + candidates &= set(map(operator.itemgetter('node_id'), - self._api.GetNodeTags(filters=tagfilter, fields=fields))) - + self._sliceapi.GetNodeTags(filters=tagfilter, fields=fields))) + # filter by vsys tags - special case since it doesn't follow # the usual semantics if self.required_vsys: newcandidates = collections.defaultdict(set) - vsys_tags = self._api.GetNodeTags( + vsys_tags = self._sliceapi.GetNodeTags( tagname='vsys', node_id = list(candidates), fields = ['node_id','value']) - + vsys_tags = map( operator.itemgetter(['node_id','value']), vsys_tags) @@ -245,7 +271,7 @@ class Node(object): filters = basefilters.copy() filters['node_id'] = list(candidates) ifaces = dict(map(operator.itemgetter('node_id','interface_ids'), - self._api.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) )) + self._sliceapi.GetNodes(filters=basefilters, fields=('node_id','interface_ids')) )) # filter candidates by interface count if self.min_num_external_ifaces is not None and self.max_num_external_ifaces is not None: @@ -259,17 +285,19 @@ class Node(object): len(ifaces.get(node_id,())) <= self.max_num_external_ifaces ) candidates = set(filter(predicate, candidates)) - + # make sure hostnames are resolvable + hostnames = dict() if candidates: self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates)) - + hostnames = dict(map(operator.itemgetter('node_id','hostname'), - self._api.GetNodes(list(candidates), ['node_id','hostname']) + self._sliceapi.GetNodes(list(candidates), ['node_id','hostname']) )) + def resolvable(node_id): try: - addr = socket.gethostbyname(hostnames[node_id]) + addr = server.gethostbyname(hostnames[node_id]) return addr is not None except: return False @@ -277,8 +305,14 @@ class Node(object): maxthreads = 16)) self._logger.info(" Found %s reachable candidates.", len(candidates)) - - return candidates + + for h in hostnames.keys(): + if h not in candidates: + del hostnames[h] + + hostnames = dict((v,k) for k, v in hostnames.iteritems()) + + return hostnames def make_filter_description(self): """ @@ -322,6 +356,7 @@ class Node(object): def unassign_node(self): self._node_id = None + self.hostip = None try: orig_attrs = self.__orig_attrs @@ -335,20 +370,20 @@ class Node(object): def rate_nodes(self, nodes): rates = collections.defaultdict(int) tags = collections.defaultdict(dict) - replacements = {'timeframe':self.timeframe} + replacements = {'timeframe':self._timeframe} tagnames = [ tagname % replacements for tagname, weight, default in self.RATE_FACTORS ] - - taginfo = self._api.GetNodeTags( + + taginfo = self._sliceapi.GetNodeTags( node_id=list(nodes), tagname=tagnames, fields=('node_id','tagname','value')) - + unpack = operator.itemgetter('node_id','tagname','value') for value in taginfo: node, tagname, value = unpack(value) if value and value.lower() != 'n/a': - tags[tagname][int(node)] = float(value) + tags[tagname][node] = float(value) for tagname, weight, default in self.RATE_FACTORS: taginfo = tags[tagname % replacements].get @@ -360,10 +395,7 @@ class Node(object): def fetch_node_info(self): orig_attrs = {} - self._api.StartMulticall() - info = self._api.GetNodes(self._node_id) - tags = self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) - info, tags = self._api.FinishMulticall() + info, tags = self._sliceapi.GetNodeInfo(self._node_id) info = info[0] tags = dict( (t['tagname'],t['value']) @@ -373,9 +405,10 @@ class Node(object): orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces self.min_num_external_ifaces = None self.max_num_external_ifaces = None - self.timeframe = 'm' + if not self._timeframe: self._timeframe = 'w' - replacements = {'timeframe':self.timeframe} + replacements = {'timeframe':self._timeframe} + for attr, tag in self.BASEFILTERS.iteritems(): if tag in info: value = info[tag] @@ -394,7 +427,7 @@ class Node(object): if 'peer_id' in info: orig_attrs['site'] = self.site - self.site = self._api.peer_map[info['peer_id']] + self.site = self._sliceapi.peer_map[info['peer_id']] if 'interface_ids' in info: self.min_num_external_ifaces = \ @@ -404,6 +437,8 @@ class Node(object): orig_attrs['server_key'] = self.server_key self.server_key = info['ssh_rsa_key'] + self.hostip = server.gethostbyname(self.hostname) + try: self.__orig_attrs except AttributeError: @@ -443,17 +478,17 @@ class Node(object): RPM_FUSION_URL = self.RPM_FUSION_URL rpmFusion = ( - 'sudo -S rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s' + 'rpm -q rpmfusion-free-release || sudo -S rpm -i %(RPM_FUSION_URL)s' ) % { 'RPM_FUSION_URL' : RPM_FUSION_URL } else: rpmFusion = '' - + if rpmFusion: (out,err),proc = server.popen_ssh_command( rpmFusion, - host = self.hostname, + host = self.hostip, port = None, user = self.slicename, agent = None, @@ -463,7 +498,9 @@ class Node(object): ) if proc.wait(): - raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) + if self.check_bad_host(out,err): + self.blacklist() + raise RuntimeError, "Failed to set up application on host %s: %s %s" % (self.hostname, out,err,) # Launch p2p yum dependency installer self._yum_dependencies.async_setup() @@ -485,9 +522,11 @@ class Node(object): raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,) # Ensure the node is clean (no apps running that could interfere with operations) - if self.enable_cleanup: - self.do_cleanup() - + if self.enable_proc_cleanup: + self.do_proc_cleanup() + if self.enable_home_cleanup: + self.do_home_cleanup() + def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10): # Wait for the p2p installer if self._yum_dependencies and not self._installed: @@ -499,7 +538,7 @@ class Node(object): # they have to be created for deployment (out,err),proc = server.eintr_retry(server.popen_ssh_command)( "echo 'ALIVE'", - host = self.hostname, + host = self.hostip, port = None, user = self.slicename, agent = None, @@ -522,21 +561,21 @@ class Node(object): return False def destroy(self): - if self.enable_cleanup: - self.do_cleanup() + if self.enable_proc_cleanup: + self.do_proc_cleanup() def blacklist(self): if self._node_id: self._logger.warn("Blacklisting malfunctioning node %s", self.hostname) import util - util.appendBlacklist(self._node_id) + util.appendBlacklist(self.hostname) - def do_cleanup(self): + def do_proc_cleanup(self): if self.testbed().recovering: # WOW - not now return - self._logger.info("Cleaning up %s", self.hostname) + self._logger.info("Cleaning up processes on %s", self.hostname) cmds = [ "sudo -S killall python tcpdump || /bin/true ; " @@ -554,7 +593,7 @@ class Node(object): cmd % { 'slicename' : self.slicename , }, - host = self.hostname, + host = self.hostip, port = None, user = self.slicename, agent = None, @@ -565,7 +604,36 @@ class Node(object): retry = 3 ) proc.wait() - + + def do_home_cleanup(self): + if self.testbed().recovering: + # WOW - not now + return + + self._logger.info("Cleaning up home on %s", self.hostname) + + cmds = [ + "find . -maxdepth 1 \( -name '.cache' -o -name '.local' -o -name '.config' -o -name 'nepi-*' \) -execdir rm -rf {} + " + ] + + for cmd in cmds: + (out,err),proc = server.popen_ssh_command( + # Some apps need two kills + cmd % { + 'slicename' : self.slicename , + }, + host = self.hostip, + port = None, + user = self.slicename, + agent = None, + ident_key = self.ident_path, + server_key = self.server_key, + tty = True, # so that ps -N -T works as advertised... + timeout = 60, + retry = 3 + ) + proc.wait() + def prepare_dependencies(self): # Configure p2p yum dependency installer if self.required_packages and not self._installed: @@ -601,10 +669,10 @@ class Node(object): if len(routes) > MAX_VROUTE_ROUTES: return 'sliceip' - vsys_vnet = ipaddr.IPNetwork(vsys_vnet) + vsys_vnet = ipaddr.IPv4Network(vsys_vnet) for route in routes: - dest, prefix, nexthop, metric = route - dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix)) + dest, prefix, nexthop, metric, device = route + dest = ipaddr.IPv4Network("%s/%d" % (dest,prefix)) nexthop = ipaddr.IPAddress(nexthop) if dest not in vsys_vnet or nexthop not in vsys_vnet: return 'sliceip' @@ -612,7 +680,7 @@ class Node(object): return 'vroute' def format_route(self, route, dev, method, action): - dest, prefix, nexthop, metric = route + dest, prefix, nexthop, metric, device = route if method == 'vroute': return ( "%s %s%s gw %s %s" % ( @@ -735,7 +803,7 @@ class Node(object): "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict( home = server.shell_escape(self.home_path), method = method), - host = self.hostname, + host = self.hostip, port = None, user = self.slicename, agent = None, @@ -750,3 +818,10 @@ class Node(object): elif out or err: logger.debug("%s said: %s%s", method, out, err) + def check_bad_host(self, out, err): + badre = re.compile(r'(?:' + r"curl: [(]\d+[)] Couldn't resolve host 'download1[.]rpmfusion[.]org'" + r'|Error: disk I/O error' + r')', + re.I) + return badre.search(out) or badre.search(err)