X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Fnepi%2Ftestbeds%2Fplanetlab%2Fnode.py;h=4936491898b0a967d5a372c33818d73f81013339;hb=514d86a8bdd4e5df384abd5f884a3c267d005390;hp=99ba2aa57b4b8aa208f7bbd3a2bc20f7a5957891;hpb=77989fabbd3186885cbd47c6cde9eb7493227f27;p=nepi.git diff --git a/src/nepi/testbeds/planetlab/node.py b/src/nepi/testbeds/planetlab/node.py index 99ba2aa5..49364918 100644 --- a/src/nepi/testbeds/planetlab/node.py +++ b/src/nepi/testbeds/planetlab/node.py @@ -12,13 +12,34 @@ import cStringIO import resourcealloc import socket import sys +import logging +import ipaddr +import operator from nepi.util import server from nepi.util import parallel +import application + +MAX_VROUTE_ROUTES = 5 + class UnresponsiveNodeError(RuntimeError): pass +def _castproperty(typ, propattr): + def _get(self): + return getattr(self, propattr) + def _set(self, value): + if value is not None or (isinstance(value, basestring) and not value): + value = typ(value) + return setattr(self, propattr, value) + def _del(self, value): + return delattr(self, propattr) + _get.__name__ = propattr + '_get' + _set.__name__ = propattr + '_set' + _del.__name__ = propattr + '_del' + return property(_get, _set, _del) + class Node(object): BASEFILTERS = { # Map Node attribute to plcapi filter name @@ -32,17 +53,41 @@ class Node(object): 'architecture' : ('arch','value'), 'operatingSystem' : ('fcdistro','value'), 'pl_distro' : ('pldistro','value'), + 'city' : ('city','value'), + 'country' : ('country','value'), + 'region' : ('region','value'), 'minReliability' : ('reliability%(timeframe)s', ']value'), 'maxReliability' : ('reliability%(timeframe)s', '[value'), 'minBandwidth' : ('bw%(timeframe)s', ']value'), 'maxBandwidth' : ('bw%(timeframe)s', '[value'), - } + 'minLoad' : ('load%(timeframe)s', ']value'), + 'maxLoad' : ('load%(timeframe)s', '[value'), + 'minCpu' : ('cpu%(timeframe)s', ']value'), + 'maxCpu' : ('cpu%(timeframe)s', '[value'), + } + + RATE_FACTORS = ( + # (, , ) + ('bw%(timeframe)s', -0.001, 1024.0), + ('cpu%(timeframe)s', 0.1, 40.0), + ('load%(timeframe)s', -0.2, 3.0), + ('reliability%(timeframe)s', 1, 100.0), + ) DEPENDS_PIDFILE = '/tmp/nepi-depends.pid' DEPENDS_LOGFILE = '/tmp/nepi-depends.log' RPM_FUSION_URL = 'http://download1.rpmfusion.org/free/fedora/rpmfusion-free-release-stable.noarch.rpm' RPM_FUSION_URL_F12 = 'http://download1.rpmfusion.org/free/fedora/releases/12/Everything/x86_64/os/rpmfusion-free-release-12-1.noarch.rpm' + minReliability = _castproperty(float, '_minReliability') + maxReliability = _castproperty(float, '_maxReliability') + minBandwidth = _castproperty(float, '_minBandwidth') + maxBandwidth = _castproperty(float, '_maxBandwidth') + minCpu = _castproperty(float, '_minCpu') + maxCpu = _castproperty(float, '_maxCpu') + minLoad = _castproperty(float, '_minLoad') + maxLoad = _castproperty(float, '_maxLoad') + def __init__(self, api=None): if not api: api = plcapi.PLCAPI() @@ -54,10 +99,17 @@ class Node(object): self.operatingSystem = None self.pl_distro = None self.site = None + self.city = None + self.country = None + self.region = None self.minReliability = None self.maxReliability = None self.minBandwidth = None self.maxBandwidth = None + self.minCpu = None + self.maxCpu = None + self.minLoad = None + self.maxLoad = None self.min_num_external_ifaces = None self.max_num_external_ifaces = None self.timeframe = 'm' @@ -69,17 +121,25 @@ class Node(object): self.rpmFusion = False self.env = collections.defaultdict(list) + # Some special applications - initialized when connected + self.multicast_forwarder = None + # Testbed-derived attributes self.slicename = None self.ident_path = None self.server_key = None self.home_path = None + self.enable_cleanup = False # Those are filled when an actual node is allocated self._node_id = None + self._yum_dependencies = None + self._installed = False + + # Logging + self._logger = logging.getLogger('nepi.testbeds.planetlab') - @property - def _nepi_testbed_environment_setup(self): + def _nepi_testbed_environment_setup_get(self): command = cStringIO.StringIO() command.write('export PYTHONPATH=$PYTHONPATH:%s' % ( ':'.join(["${HOME}/"+server.shell_escape(s) for s in self.pythonpath]) @@ -92,6 +152,11 @@ class Node(object): for envval in envvals: command.write(' ; export %s=%s' % (envkey, envval)) return command.getvalue() + def _nepi_testbed_environment_setup_set(self, value): + pass + _nepi_testbed_environment_setup = property( + _nepi_testbed_environment_setup_get, + _nepi_testbed_environment_setup_set) def build_filters(self, target_filters, filter_map): for attr, tag in filter_map.iteritems(): @@ -109,7 +174,7 @@ class Node(object): ) def find_candidates(self, filter_slice_id=None): - print >>sys.stderr, "Finding candidates for", self.make_filter_description() + self._logger.info("Finding candidates for %s", self.make_filter_description()) fields = ('node_id',) replacements = {'timeframe':self.timeframe} @@ -197,7 +262,7 @@ class Node(object): # make sure hostnames are resolvable if candidates: - print >>sys.stderr, " Found", len(candidates), "candidates. Checking for reachability..." + self._logger.info(" Found %s candidates. Checking for reachability...", len(candidates)) hostnames = dict(map(operator.itemgetter('node_id','hostname'), self._api.GetNodes(list(candidates), ['node_id','hostname']) @@ -211,7 +276,7 @@ class Node(object): candidates = set(parallel.pfilter(resolvable, candidates, maxthreads = 16)) - print >>sys.stderr, " Found", len(candidates), "reachable candidates." + self._logger.info(" Found %s reachable candidates.", len(candidates)) return candidates @@ -257,14 +322,52 @@ class Node(object): def unassign_node(self): self._node_id = None - self.__dict__.update(self.__orig_attrs) + + try: + orig_attrs = self.__orig_attrs + except AttributeError: + return + + for key, value in __orig_attrs.iteritems(): + setattr(self, key, value) + del self.__orig_attrs + def rate_nodes(self, nodes): + rates = collections.defaultdict(int) + tags = collections.defaultdict(dict) + replacements = {'timeframe':self.timeframe} + tagnames = [ tagname % replacements + for tagname, weight, default in self.RATE_FACTORS ] + + taginfo = self._api.GetNodeTags( + node_id=list(nodes), + tagname=tagnames, + fields=('node_id','tagname','value')) + + unpack = operator.itemgetter('node_id','tagname','value') + for value in taginfo: + node, tagname, value = unpack(value) + if value and value.lower() != 'n/a': + tags[tagname][int(node)] = float(value) + + for tagname, weight, default in self.RATE_FACTORS: + taginfo = tags[tagname % replacements].get + for node in nodes: + rates[node] += weight * taginfo(node,default) + + return map(rates.__getitem__, nodes) + def fetch_node_info(self): orig_attrs = {} - info = self._api.GetNodes(self._node_id)[0] + self._api.StartMulticall() + info = self._api.GetNodes(self._node_id) + tags = self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) + info, tags = self._api.FinishMulticall() + info = info[0] + tags = dict( (t['tagname'],t['value']) - for t in self._api.GetNodeTags(node_id=self._node_id, fields=('tagname','value')) ) + for t in tags ) orig_attrs['min_num_external_ifaces'] = self.min_num_external_ifaces orig_attrs['max_num_external_ifaces'] = self.max_num_external_ifaces @@ -285,6 +388,8 @@ class Node(object): value = tags[tag] if hasattr(self, attr): orig_attrs[attr] = getattr(self, attr) + if not value or value.lower() == 'n/a': + value = None setattr(self, attr, value) if 'peer_id' in info: @@ -299,7 +404,10 @@ class Node(object): orig_attrs['server_key'] = self.server_key self.server_key = info['ssh_rsa_key'] - self.__orig_attrs = orig_attrs + try: + self.__orig_attrs + except AttributeError: + self.__orig_attrs = orig_attrs def validate(self): if self.home_path is None: @@ -309,12 +417,22 @@ class Node(object): if self.slicename is None: raise AssertionError, "Misconfigured node: unspecified slice" + def recover(self): + # Mark dependencies installed + self._installed = True + + # Clear load attributes, they impair re-discovery + self.minReliability = \ + self.maxReliability = \ + self.minBandwidth = \ + self.maxBandwidth = \ + self.minCpu = \ + self.maxCpu = \ + self.minLoad = \ + self.maxLoad = None + def install_dependencies(self): - if self.required_packages: - # TODO: make dependant on the experiment somehow... - pidfile = self.DEPENDS_PIDFILE - logfile = self.DEPENDS_LOGFILE - + if self.required_packages and not self._installed: # If we need rpmfusion, we must install the repo definition and the gpg keys if self.rpmFusion: if self.operatingSystem == 'f12': @@ -332,31 +450,26 @@ class Node(object): else: rpmFusion = '' - # Start process in a "daemonized" way, using nohup and heavy - # stdin/out redirection to avoid connection issues - (out,err),proc = rspawn.remote_spawn( - "( %(rpmfusion)s yum -y install %(packages)s && echo SUCCESS || echo FAILURE )" % { - 'packages' : ' '.join(self.required_packages), - 'rpmfusion' : rpmFusion, - }, - pidfile = pidfile, - stdout = logfile, - stderr = rspawn.STDOUT, + if rpmFusion: + (out,err),proc = server.popen_ssh_command( + rpmFusion, + host = self.hostname, + port = None, + user = self.slicename, + agent = None, + ident_key = self.ident_path, + server_key = self.server_key, + timeout = 600, + ) - host = self.hostname, - port = None, - user = self.slicename, - agent = None, - ident_key = self.ident_path, - server_key = self.server_key, - sudo = True - ) + if proc.wait(): + raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) - if proc.wait(): - raise RuntimeError, "Failed to set up application: %s %s" % (out,err,) + # Launch p2p yum dependency installer + self._yum_dependencies.async_setup() def wait_provisioning(self, timeout = 20*60): - # recently provisioned nodes may not be up yet + # Wait for the p2p installer sleeptime = 1.0 totaltime = 0.0 while not self.is_alive(): @@ -368,66 +481,20 @@ class Node(object): # PlanetLab has a 15' delay on configuration propagation # If we're above that delay, the unresponsiveness is not due # to this delay. - raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,) + if not self.is_alive(verbose=True): + raise UnresponsiveNodeError, "Unresponsive host %s" % (self.hostname,) + + # Ensure the node is clean (no apps running that could interfere with operations) + if self.enable_cleanup: + self.do_cleanup() def wait_dependencies(self, pidprobe=1, probe=0.5, pidmax=10, probemax=10): - if self.required_packages: - pidfile = self.DEPENDS_PIDFILE - - # get PID - pid = ppid = None - for probenum in xrange(pidmax): - pidtuple = rspawn.remote_check_pid( - pidfile = pidfile, - host = self.hostname, - port = None, - user = self.slicename, - agent = None, - ident_key = self.ident_path, - server_key = self.server_key - ) - if pidtuple: - pid, ppid = pidtuple - break - else: - time.sleep(pidprobe) - else: - raise RuntimeError, "Failed to obtain pidfile for dependency installer" - - # wait for it to finish - while rspawn.RUNNING is rspawn.remote_status( - pid, ppid, - host = self.hostname, - port = None, - user = self.slicename, - agent = None, - ident_key = self.ident_path, - server_key = self.server_key - ): - time.sleep(probe) - probe = min(probemax, 1.5*probe) - - # check results - logfile = self.DEPENDS_LOGFILE - - (out,err),proc = server.popen_ssh_command( - "cat %s" % (server.shell_escape(logfile),), - host = self.hostname, - port = None, - user = self.slicename, - agent = None, - ident_key = self.ident_path, - server_key = self.server_key - ) - - if proc.wait(): - raise RuntimeError, "Failed to install dependencies: %s %s" % (out,err,) - - success = out.strip().rsplit('\n',1)[-1].strip() == 'SUCCESS' - if not success: - raise RuntimeError, "Failed to install dependencies - buildlog:\n%s\n%s" % (out,err,) + # Wait for the p2p installer + if self._yum_dependencies and not self._installed: + self._yum_dependencies.async_setup_wait() + self._installed = True - def is_alive(self): + def is_alive(self, verbose = False): # Make sure all the paths are created where # they have to be created for deployment (out,err),proc = server.eintr_retry(server.popen_ssh_command)( @@ -437,59 +504,249 @@ class Node(object): user = self.slicename, agent = None, ident_key = self.ident_path, - server_key = self.server_key + server_key = self.server_key, + timeout = 60, + err_on_timeout = False, + persistent = False ) if proc.wait(): + if verbose: + self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err) return False elif not err and out.strip() == 'ALIVE': return True else: + if verbose: + self._logger.warn("Unresponsive node %s got:\n%s%s", self.hostname, out, err) return False + def destroy(self): + if self.enable_cleanup: + self.do_cleanup() + + def blacklist(self): + if self._node_id: + self._logger.warn("Blacklisting malfunctioning node %s", self.hostname) + import util + util.appendBlacklist(self._node_id) + + def do_cleanup(self): + if self.testbed().recovering: + # WOW - not now + return + + self._logger.info("Cleaning up %s", self.hostname) + + cmds = [ + "sudo -S killall python tcpdump || /bin/true ; " + "sudo -S killall python tcpdump || /bin/true ; " + "sudo -S kill $(ps -N -T -o pid --no-heading | grep -v $PPID | sort) || /bin/true ", + "sudo -S killall -u %(slicename)s || /bin/true ", + "sudo -S killall -u root || /bin/true ", + "sudo -S killall -u %(slicename)s || /bin/true ", + "sudo -S killall -u root || /bin/true ", + ] + + for cmd in cmds: + (out,err),proc = server.popen_ssh_command( + # Some apps need two kills + cmd % { + 'slicename' : self.slicename , + }, + host = self.hostname, + port = None, + user = self.slicename, + agent = None, + ident_key = self.ident_path, + server_key = self.server_key, + tty = True, # so that ps -N -T works as advertised... + timeout = 60, + retry = 3 + ) + proc.wait() + + def prepare_dependencies(self): + # Configure p2p yum dependency installer + if self.required_packages and not self._installed: + self._yum_dependencies = application.YumDependency(self._api) + self._yum_dependencies.node = self + self._yum_dependencies.home_path = "nepi-yumdep" + self._yum_dependencies.depends = ' '.join(self.required_packages) - def configure_routes(self, routes, devs): + def routing_method(self, routes, vsys_vnet): """ - Add the specified routes to the node's routing table + There are two methods, vroute and sliceip. + + vroute: + Modifies the node's routing table directly, validating that the IP + range lies within the network given by the slice's vsys_vnet tag. + This method is the most scalable for very small routing tables + that need not modify other routes (including the default) + + sliceip: + Uses policy routing and iptables filters to create per-sliver + routing tables. It's the most flexible way, but it doesn't scale + as well since only 155 routing tables can be created this way. + + This method will return the most appropriate routing method, which will + prefer vroute for small routing tables. """ - rules = [] + # For now, sliceip results in kernel panics + # so we HAVE to use vroute + return 'vroute' + + # We should not make the routing table grow too big + if len(routes) > MAX_VROUTE_ROUTES: + return 'sliceip' + + vsys_vnet = ipaddr.IPNetwork(vsys_vnet) + for route in routes: + dest, prefix, nexthop, metric = route + dest = ipaddr.IPNetwork("%s/%d" % (dest,prefix)) + nexthop = ipaddr.IPAddress(nexthop) + if dest not in vsys_vnet or nexthop not in vsys_vnet: + return 'sliceip' + + return 'vroute' + + def format_route(self, route, dev, method, action): + dest, prefix, nexthop, metric = route + if method == 'vroute': + return ( + "%s %s%s gw %s %s" % ( + action, + dest, + (("/%d" % (prefix,)) if prefix and prefix != 32 else ""), + nexthop, + dev, + ) + ) + elif method == 'sliceip': + return ( + "route %s to %s%s via %s metric %s dev %s" % ( + action, + dest, + (("/%d" % (prefix,)) if prefix and prefix != 32 else ""), + nexthop, + metric or 1, + dev, + ) + ) + else: + raise AssertionError, "Unknown method" + + def _annotate_routes_with_devs(self, routes, devs, method): + dev_routes = [] for route in routes: for dev in devs: if dev.routes_here(route): - # Schedule rule - dest, prefix, nexthop, metric = route - rules.append( - "add %s%s gw %s %s" % ( - dest, - (("/%d" % (prefix,)) if prefix and prefix != 32 else ""), - nexthop, - dev.if_name, - ) - ) + dev_routes.append(tuple(route) + (dev.if_name,)) # Stop checking break else: - raise RuntimeError, "Route %s cannot be bound to any virtual interface " \ - "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs) + if method == 'sliceip': + dev_routes.append(tuple(route) + ('eth0',)) + else: + raise RuntimeError, "Route %s cannot be bound to any virtual interface " \ + "- PL can only handle rules over virtual interfaces. Candidates are: %s" % (route,devs) + return dev_routes + + def configure_routes(self, routes, devs, vsys_vnet): + """ + Add the specified routes to the node's routing table + """ + rules = [] + method = self.routing_method(routes, vsys_vnet) + tdevs = set() - print >>sys.stderr, "Setting up routes for", self.hostname + # annotate routes with devices + dev_routes = self._annotate_routes_with_devs(routes, devs, method) + for route in dev_routes: + route, dev = route[:-1], route[-1] + + # Schedule rule + tdevs.add(dev) + rules.append(self.format_route(route, dev, method, 'add')) + + if method == 'sliceip': + rules = map('enable '.__add__, tdevs) + rules + + self._logger.info("Setting up routes for %s using %s", self.hostname, method) + self._logger.debug("Routes for %s:\n\t%s", self.hostname, '\n\t'.join(rules)) + + self.apply_route_rules(rules, method) + + self._configured_routes = set(routes) + self._configured_devs = tdevs + self._configured_method = method + + def reconfigure_routes(self, routes, devs, vsys_vnet): + """ + Updates the routes in the node's routing table to match + the given route list + """ + method = self._configured_method + dev_routes = self._annotate_routes_with_devs(routes, devs, method) + + current = self._configured_routes + current_devs = self._configured_devs + + new = set(dev_routes) + new_devs = set(map(operator.itemgetter(-1), dev_routes)) + + deletions = current - new + insertions = new - current + + dev_deletions = current_devs - new_devs + dev_insertions = new_devs - current_devs + + # Generate rules + rules = [] + + # Rule deletions first + for route in deletions: + route, dev = route[:-1], route[-1] + rules.append(self.format_route(route, dev, method, 'del')) + + if method == 'sliceip': + # Dev deletions now + rules.extend(map('disable '.__add__, dev_deletions)) + + # Dev insertions now + rules.extend(map('enable '.__add__, dev_insertions)) + + # Rule insertions now + for route in insertions: + route, dev = route[:-1], dev[-1] + rules.append(self.format_route(route, dev, method, 'add')) + + # Apply + self.apply_route_rules(rules, method) + + self._configured_routes = dev_routes + self._configured_devs = new_devs + + def apply_route_rules(self, rules, method): (out,err),proc = server.popen_ssh_command( - "( sudo -S bash -c 'cat /vsys/vroute.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/vroute.in' ; sleep 0.1" % dict( - home = server.shell_escape(self.home_path)), + "( sudo -S bash -c 'cat /vsys/%(method)s.out >&2' & ) ; sudo -S bash -c 'cat > /vsys/%(method)s.in' ; sleep 0.5" % dict( + home = server.shell_escape(self.home_path), + method = method), host = self.hostname, port = None, user = self.slicename, agent = None, ident_key = self.ident_path, server_key = self.server_key, - stdin = '\n'.join(rules) + stdin = '\n'.join(rules), + timeout = 300 ) if proc.wait() or err: raise RuntimeError, "Could not set routes (%s) errors: %s%s" % (rules,out,err) - - + elif out or err: + logger.debug("%s said: %s%s", method, out, err)