X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Ftestbeds%2Fplanetlab%2Fnode.py;h=a1e9bcd3d943b206ce482f9ea3ff035afc141fef;hb=ba9d345fae54787c9bd99842d7f6a3aac0cbd5ea;hp=10ff5930cc4390456b3d05d4fd802c05574b9d75;hpb=fe42f33a7a49165888f9af1776001629bbe09b94;p=nepi.git diff --git a/src/nepi/testbeds/planetlab/node.py b/src/nepi/testbeds/planetlab/node.py index 10ff5930..a1e9bcd3 100644 --- a/src/nepi/testbeds/planetlab/node.py +++ b/src/nepi/testbeds/planetlab/node.py @@ -7,8 +7,38 @@ import operator import rspawn import time import os +import collections +import cStringIO +import resourcealloc +import socket +import sys +import logging +import ipaddr +import operator from nepi.util import server +from nepi.util import parallel + +import application + +MAX_VROUTE_ROUTES = 5 + +class UnresponsiveNodeError(RuntimeError): + pass + +def _castproperty(typ, propattr): + def _get(self): + return getattr(self, propattr) + def _set(self, value): + if value is not None or (isinstance(value, basestring) and not value): + value = typ(value) + return setattr(self, propattr, value) + def _del(self, value): + return delattr(self, propattr) + _get.__name__ = propattr + '_get' + _set.__name__ = propattr + '_set' + _del.__name__ = propattr + '_del' + return property(_get, _set, _del) class Node(object): BASEFILTERS = { @@ -21,16 +51,42 @@ class Node(object): # There are replacements that are applied with string formatting, # so '%' has to be escaped as '%%'. 'architecture' : ('arch','value'), - 'operating_system' : ('fcdistro','value'), + 'operatingSystem' : ('fcdistro','value'), 'pl_distro' : ('pldistro','value'), - 'min_reliability' : ('reliability%(timeframe)s', ']value'), - 'max_reliability' : ('reliability%(timeframe)s', '[value'), - 'min_bandwidth' : ('bw%(timeframe)s', ']value'), - 'max_bandwidth' : ('bw%(timeframe)s', '[value'), - } + 'city' : ('city','value'), + 'country' : ('country','value'), + 'region' : ('region','value'), + 'minReliability' : ('reliability%(timeframe)s', ']value'), + 'maxReliability' : ('reliability%(timeframe)s', '[value'), + 'minBandwidth' : ('bw%(timeframe)s', ']value'), + 'maxBandwidth' : ('bw%(timeframe)s', '[value'), + 'minLoad' : ('load%(timeframe)s', ']value'), + 'maxLoad' : ('load%(timeframe)s', '[value'), + 'minCpu' : ('cpu%(timeframe)s', ']value'), + 'maxCpu' : ('cpu%(timeframe)s', '[value'), + } + + RATE_FACTORS = ( + # (, , ) + ('bw%(timeframe)s', -0.001, 1024.0), + ('cpu%(timeframe)s', 0.1, 40.0), + ('load%(timeframe)s', -0.2, 3.0), + ('reliability%(timeframe)s', 1, 100.0), + ) DEPENDS_PIDFILE = '/tmp/nepi-depends.pid' DEPENDS_LOGFILE = '/tmp/nepi-depends.log' + RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm' + RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm' + + minReliability = _castproperty(float, '_minReliability') + maxReliability = _castproperty(float, '_maxReliability') + minBandwidth = _castproperty(float, '_minBandwidth') + maxBandwidth = _castproperty(float, '_maxBandwidth') + minCpu = _castproperty(float, '_minCpu') + maxCpu = _castproperty(float, '_maxCpu') + minLoad = _castproperty(float, '_minLoad') + maxLoad = _castproperty(float, '_maxLoad') def __init__(self, api=None): if not api: @@ -40,29 +96,67 @@ class Node(object): # Attributes self.hostname = None self.architecture = None - self.operating_system = None + self.operatingSystem = None self.pl_distro = None self.site = None - self.emulation = None - self.min_reliability = None - self.max_reliability = None - self.min_bandwidth = None - self.max_bandwidth = None + self.city = None + self.country = None + self.region = None + self.minReliability = None + self.maxReliability = None + self.minBandwidth = None + self.maxBandwidth = None + self.minCpu = None + self.maxCpu = None + self.minLoad = None + self.maxLoad = None self.min_num_external_ifaces = None self.max_num_external_ifaces = None self.timeframe = 'm' - # Applications add requirements to connected nodes + # Applications and routes add requirements to connected nodes self.required_packages = set() + self.required_vsys = set() + self.pythonpath = [] + self.rpmFusion = False + self.env = collections.defaultdict(list) + + # Some special applications - initialized when connected + self.multicast_forwarder = None # Testbed-derived attributes self.slicename = None self.ident_path = None self.server_key = None self.home_path = None + self.enable_cleanup = False # Those are filled when an actual node is allocated self._node_id = None + self._yum_dependencies = None + self._installed = False + + # Logging + self._logger = logging.getLogger('nepi.testbeds.planetlab') + + def _nepi_testbed_environment_setup_get(self): + command = cStringIO.StringIO() + command.write('export PYTHONPATH=$PYTHONPATH:%s' % ( + ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath]) + )) + command.write(' ; export PATH=$PATH:%s' % ( + ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath]) + )) + if self.env: + for envkey, envvals in self.env.iteritems(): + for envval in envvals: + command.write(' ; export %s=%s' % (envkey, envval)) + return command.getvalue() + def _nepi_testbed_environment_setup_set(self, value): + pass + _nepi_testbed_environment_setup = property( + _nepi_testbed_environment_setup_get, + _nepi_testbed_environment_setup_set) def build_filters(self, target_filters, filter_map): for attr, tag in filter_map.iteritems(): @@ -80,14 +174,23 @@ class Node(object): ) def find_candidates(self, filter_slice_id=None): + self._logger.info("Finding candidates for %s", self.make_filter_description()) + fields = ('node_id',) replacements = {'timeframe':self.timeframe} # get initial candidates (no tag filters) basefilters = self.build_filters({}, self.BASEFILTERS) + rootfilters = basefilters.copy() if filter_slice_id: basefilters['|slice_ids'] = (filter_slice_id,) + # only pick healthy nodes + basefilters['run_level'] = 'boot' + basefilters['boot_state'] = 'boot' + basefilters['node_type'] = 'regular' # nepi can only handle regular nodes (for now) + basefilters['>last_contact'] = int(time.time()) - 5*3600 # allow 5h out of contact, for timezone discrepancies + # keyword-only "pseudofilters" extra = {} if self.site: @@ -103,7 +206,7 @@ class Node(object): # don't bother if there's no filter defined if attr in applicable: - tagfilter = basefilters.copy() + tagfilter = rootfilters.copy() tagfilter['tagname'] = tagname % replacements tagfilter[expr % replacements] = getattr(self,attr) tagfilter['node_id'] = list(candidates) @@ -111,6 +214,31 @@ class Node(object): candidates &= set(map(operator.itemgetter('node_id'), self._api.GetNodeTags(filters=tagfilter, fields=fields))) + # filter by vsys tags - special case since it doesn't follow + # the usual semantics + if self.required_vsys: + newcandidates = collections.defaultdict(set) + + vsys_tags = self._api.GetNodeTags( + tagname='vsys', + node_id = list(candidates), + fields = ['node_id','value']) + + vsys_tags = map( + operator.itemgetter(['node_id','value']), + vsys_tags) + + required_vsys = self.required_vsys + for node_id, value in vsys_tags: + if value in required_vsys: + newcandidates[value].add(node_id) + + # take only those that have all the required vsys tags + newcandidates = reduce( + lambda accum, new : accum & new, + newcandidates.itervalues(), + candidates) + # filter by iface count if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None: # fetch interfaces for all, in one go @@ -131,18 +259,110 @@ class Node(object): len(ifaces.get(node_id,())) <= self.max_num_external_ifaces ) candidates = set(filter(predicate, candidates)) + + # make sure hostnames are resolvable + if candidates: + self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates)) + + hostnames = dict(map(operator.itemgetter('node_id','hostname'), + self._api.GetNodes(list(candidates), ['node_id','hostname']) + )) + def resolvable(node_id): + try: + addr = socket.gethostbyname(hostnames[node_id]) + return addr is not None + except: + return False + candidates = set(parallel.pfilter(resolvable, candidates, + maxthreads = 16)) + + self._logger.info(" Found %s reachable candidates.", len(candidates)) return candidates + + def make_filter_description(self): + """ + Makes a human-readable description of filtering conditions + for find_candidates. + """ + + # get initial candidates (no tag filters) + filters = self.build_filters({}, self.BASEFILTERS) + + # keyword-only "pseudofilters" + if self.site: + filters['peer'] = self.site + + # filter by tag, one tag at a time + applicable = self.applicable_filters + for tagfilter in self.TAGFILTERS.iteritems(): + attr, (tagname, expr) = tagfilter + + # don't bother if there's no filter defined + if attr in applicable: + filters[attr] = getattr(self,attr) + + # filter by vsys tags - special case since it doesn't follow + # the usual semantics + if self.required_vsys: + filters['vsys'] = ','.join(list(self.required_vsys)) + + # filter by iface count + if self.min_num_external_ifaces is not None or self.max_num_external_ifaces is not None: + filters['num_ifaces'] = '-'.join([ + str(self.min_num_external_ifaces or '0'), + str(self.max_num_external_ifaces or 'inf') + ]) + + return '; '.join(map('%s: %s'.__mod__,filters.iteritems())) def assign_node_id(self, node_id): self._node_id = node_id self.fetch_node_info() + def unassign_node(self): + self._node_id = None + self.__dict__.update(self.__orig_attrs) + + def rate_nodes(self, nodes): + rates = collections.defaultdict(int) + tags = collections.defaultdict(dict) + replacements = {'timeframe':self.timeframe} + tagnames = [ tagname % replacements + for tagname, weight, default in self.RATE_FACTORS ] + + taginfo = self._api.GetNodeTags( + node_id=list(nodes), + tagname=tagnames, + fields=('node_id','tagname','value')) + + unpack = operator.itemgetter('node_id','tagname','value') + for value in taginfo: + node, tagname, value = unpack(value) + if value and value.lower() != 'n/a': + tags[tagname][int(node)] = float(value) + + for tagname, weight, default in self.RATE_FACTORS: + taginfo = tags[tagname % replacements].get + for node in nodes: + rates[node] += weight * taginfo(node,default) + + return map(rates.__getitem__, nodes) + def fetch_node_info(self): - info = self._api.GetNodes(self._node_id)[0] + orig_attrs = {} + + self._api.StartMulticall() + info = self._api.GetNodes(self._node_id) + tags = self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) + info, tags = self._api.FinishMulticall() + info = info[0] + tags = dict( (t['tagname'],t['value']) - for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) ) + for t in tags ) + orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces + orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces self.min_num_external_ifaces = None self.max_num_external_ifaces = None self.timeframe = 'm' @@ -151,14 +371,21 @@ class Node(object): for attr, tag in self.BASEFILTERS.iteritems(): if tag in info: value = info[tag] + if hasattr(self, attr): + orig_attrs[attr] = getattr(self, attr) setattr(self, attr, value) for attr, (tag,_) in self.TAGFILTERS.iteritems(): tag = tag % replacements if tag in tags: value = tags[tag] + if hasattr(self, attr): + orig_attrs[attr] = getattr(self, attr) + if not value or value.lower() == 'n/a': + value = None setattr(self, attr, value) if 'peer_id' in info: + orig_attrs['site'] = self.site self.site = self._api.peer_map[info['peer_id']] if 'interface_ids' in info: @@ -166,7 +393,10 @@ class Node(object): self.max_num_external_ifaces = len(info['interface_ids']) if 'ssh_rsa_key' in info: + orig_attrs['server_key'] = self.server_key self.server_key = info['ssh_rsa_key'] + + self.__orig_attrs = orig_attrs def validate(self): if self.home_path is None: @@ -176,88 +406,336 @@ class Node(object): if self.slicename is None: raise AssertionError, "Misconfigured node: unspecified slice" + def recover(self): + # Mark dependencies installed + self._installed = True + + # Clear load attributes, they impair re-discovery + self.minReliability = \ + self.maxReliability = \ + self.minBandwidth = \ + self.maxBandwidth = \ + self.minCpu = \ + self.maxCpu = \ + self.minLoad = \ + self.maxLoad = None + def install_dependencies(self): - if self.required_packages: - # TODO: make dependant on the experiment somehow... - pidfile = self.DEPENDS_PIDFILE - logfile = self.DEPENDS_LOGFILE - - # Start process in a "daemonized" way, using nohup and heavy - # stdin/out redirection to avoid connection issues - (out,err),proc = rspawn.remote_spawn( - "yum -y install %(packages)s" % { - 'packages' : ' '.join(self.required_packages), - }, - pidfile = pidfile, - stdout = logfile, - stderr = rspawn.STDOUT, - - host = self.hostname, - port = None, - user = self.slicename, - agent = None, - ident_key = self.ident_path, - server_key = self.server_key, - sudo = True - ) - - if proc.wait(): - raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) - - def wait_dependencies(self, pidprobe=1, probe=10, pidmax=10): - if self.required_packages: - pidfile = self.DEPENDS_PIDFILE + if self.required_packages and not self._installed: + # If we need rpmfusion, we must install the repo definition and the gpg keys + if self.rpmFusion: + if self.operatingSystem == 'f12': + # Fedora 12 requires a different rpmfusion package + RPM_FUSION_URL = self.RPM_FUSION_URL_F12 + else: + # This one works for f13+ + RPM_FUSION_URL = self.RPM_FUSION_URL + + rpmFusion = ( + '( rpm -q $(rpm -q -p %(RPM_FUSION_URL)s) || rpm -i %(RPM_FUSION_URL)s ) &&' + ) % { + 'RPM_FUSION_URL' : RPM_FUSION_URL + } + else: + rpmFusion = '' - # get PID - pid = ppid = None - for probenum in xrange(pidmax): - pidtuple = rspawn.remote_check_pid( - pidfile = pidfile, + if rpmFusion: + (out,err),proc = server.popen_ssh_command( + rpmFusion, host = self.hostname, port = None, user = self.slicename, agent = None, ident_key = self.ident_path, - server_key = self.server_key + server_key = self.server_key, + timeout = 600, ) - if pidtuple: - pid, ppid = pidtuple - break - else: - time.sleep(pidprobe) - else: - raise RuntimeError, "Failed to obtain pidfile for dependency installer" + + if proc.wait(): + raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) + + # Launch p2p yum dependency installer + self._yum_dependencies.async_setup() + + def wait_provisioning(self, timeout = 20*60): + # Wait for the p2p installer + sleeptime = 1.0 + totaltime = 0.0 + while not self.is_alive(): + time.sleep(sleeptime) + totaltime += sleeptime + sleeptime = min(30.0, sleeptime*1.5) + + if totaltime > timeout: + # PlanetLab has a 15' delay on configuration propagation + # If we're above that delay, the unresponsiveness is not due + # to this delay. + if not self.is_alive(verbose=True): + raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,) - # wait for it to finish - while rspawn.RUNNING is rspawn.remote_status( - pid, ppid, - host = self.hostname, - port = None, - user = self.slicename, - agent = None, - ident_key = self.ident_path, - server_key = self.server_key - ): - time.sleep(probe) + # Ensure the node is clean (no apps running that could interfere with operations) + if self.enable_cleanup: + self.do_cleanup() + + def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10): + # Wait for the p2p installer + if self._yum_dependencies and not self._installed: + self._yum_dependencies.async_setup_wait() + self._installed = True - def is_alive(self): + def is_alive(self, verbose = False): # Make sure all the paths are created where # they have to be created for deployment - (out,err),proc = server.popen_ssh_command( + (out,err),proc = server.eintr_retry(server.popen_ssh_command)( "echo 'ALIVE'", host = self.hostname, port = None, user = self.slicename, agent = None, ident_key = self.ident_path, - server_key = self.server_key + server_key = self.server_key, + timeout = 60, + err_on_timeout = False, + persistent = False ) if proc.wait(): + if verbose: + self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err) return False elif not err and out.strip() == 'ALIVE': return True else: + if verbose: + self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err) return False + def destroy(self): + if self.enable_cleanup: + self.do_cleanup() + + def blacklist(self): + if self._node_id: + self._logger.warn("Blacklisting malfunctioning node %s", self.hostname) + import util + util.appendBlacklist(self._node_id) + + def do_cleanup(self): + if self.testbed().recovering: + # WOW - not now + return + + self._logger.info("Cleaning up %s", self.hostname) + + cmds = [ + "sudo -S killall python tcpdump || /bin/true ; " + "sudo -S killall python tcpdump || /bin/true ; " + "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ", + "sudo -S killall -u %(slicename)s || /bin/true ", + "sudo -S killall -u root || /bin/true ", + "sudo -S killall -u %(slicename)s || /bin/true ", + "sudo -S killall -u root || /bin/true ", + ] + + for cmd in cmds: + (out,err),proc = server.popen_ssh_command( + # Some apps need two kills + cmd % { + 'slicename' : self.slicename , + }, + host = self.hostname, + port = None, + user = self.slicename, + agent = None, + ident_key = self.ident_path, + server_key = self.server_key, + tty = True, # so that ps -N -T works as advertised... + timeout = 60, + retry = 3 + ) + proc.wait() + + def prepare_dependencies(self): + # Configure p2p yum dependency installer + if self.required_packages and not self._installed: + self._yum_dependencies = application.YumDependency(self._api) + self._yum_dependencies.node = self + self._yum_dependencies.home_path = "nepi-yumdep" + self._yum_dependencies.depends = ' '.join(self.required_packages) + + def routing_method(self, routes, vsys_vnet): + """ + There are two methods, vroute and sliceip. + + vroute: + Modifies the node's routing table directly, validating that the IP + range lies within the network given by the slice's vsys_vnet tag. + This method is the most scalable for very small routing tables + that need not modify other routes (including the default) + + sliceip: + Uses policy routing and iptables filters to create per-sliver + routing tables. It's the most flexible way, but it doesn't scale + as well since only 155 routing tables can be created this way. + + This method will return the most appropriate routing method, which will + prefer vroute for small routing tables. + """ + + # For now, sliceip results in kernel panics + # so we HAVE to use vroute + return 'vroute' + + # We should not make the routing table grow too big + if len(routes) > MAX_VROUTE_ROUTES: + return 'sliceip' + + vsys_vnet = ipaddr.IPNetwork(vsys_vnet) + for route in routes: + dest, prefix, nexthop, metric = route + dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix)) + nexthop = ipaddr.IPAddress(nexthop) + if dest not in vsys_vnet or nexthop not in vsys_vnet: + return 'sliceip' + + return 'vroute' + + def format_route(self, route, dev, method, action): + dest, prefix, nexthop, metric = route + if method == 'vroute': + return ( + "%s %s%s gw %s %s" % ( + action, + dest, + (("/%d" % (prefix,)) if prefix and prefix != 32 else ""), + nexthop, + dev, + ) + ) + elif method == 'sliceip': + return ( + "route %s to %s%s via %s metric %s dev %s" % ( + action, + dest, + (("/%d" % (prefix,)) if prefix and prefix != 32 else ""), + nexthop, + metric or 1, + dev, + ) + ) + else: + raise AssertionError, "Unknown method" + + def _annotate_routes_with_devs(self, routes, devs, method): + dev_routes = [] + for route in routes: + for dev in devs: + if dev.routes_here(route): + dev_routes.append(tuple(route) + (dev.if_name,)) + + # Stop checking + break + else: + if method == 'sliceip': + dev_routes.append(tuple(route) + ('eth0',)) + else: + raise RuntimeError, "Route %s cannot be bound to any virtual interface " \ + "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs) + return dev_routes + + def configure_routes(self, routes, devs, vsys_vnet): + """ + Add the specified routes to the node's routing table + """ + rules = [] + method = self.routing_method(routes, vsys_vnet) + tdevs = set() + + # annotate routes with devices + dev_routes = self._annotate_routes_with_devs(routes, devs, method) + for route in dev_routes: + route, dev = route[:-1], route[-1] + + # Schedule rule + tdevs.add(dev) + rules.append(self.format_route(route, dev, method, 'add')) + + if method == 'sliceip': + rules = map('enable '.__add__, tdevs) + rules + + self._logger.info("Setting up routes for %s using %s", self.hostname, method) + self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules)) + + self.apply_route_rules(rules, method) + + self._configured_routes = set(routes) + self._configured_devs = tdevs + self._configured_method = method + + def reconfigure_routes(self, routes, devs, vsys_vnet): + """ + Updates the routes in the node's routing table to match + the given route list + """ + method = self._configured_method + + dev_routes = self._annotate_routes_with_devs(routes, devs, method) + + current = self._configured_routes + current_devs = self._configured_devs + + new = set(dev_routes) + new_devs = set(map(operator.itemgetter(-1), dev_routes)) + + deletions = current - new + insertions = new - current + + dev_deletions = current_devs - new_devs + dev_insertions = new_devs - current_devs + + # Generate rules + rules = [] + + # Rule deletions first + for route in deletions: + route, dev = route[:-1], route[-1] + rules.append(self.format_route(route, dev, method, 'del')) + + if method == 'sliceip': + # Dev deletions now + rules.extend(map('disable '.__add__, dev_deletions)) + + # Dev insertions now + rules.extend(map('enable '.__add__, dev_insertions)) + + # Rule insertions now + for route in insertions: + route, dev = route[:-1], dev[-1] + rules.append(self.format_route(route, dev, method, 'add')) + + # Apply + self.apply_route_rules(rules, method) + + self._configured_routes = dev_routes + self._configured_devs = new_devs + + def apply_route_rules(self, rules, method): + (out,err),proc = server.popen_ssh_command( + "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict( + home = server.shell_escape(self.home_path), + method = method), + host = self.hostname, + port = None, + user = self.slicename, + agent = None, + ident_key = self.ident_path, + server_key = self.server_key, + stdin = '\n'.join(rules), + timeout = 300 + ) + + if proc.wait() or err: + raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err) + elif out or err: + logger.debug("%s said: %s%s", method, out, err)